## Import

In [1]:
import findspark
findspark.init()
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, ArrayType, StringType, ShortType, FloatType
from pyspark.sql import SparkSession
from spark_functions import *


  from tqdm.autonotebook import tqdm, trange


In [12]:
#spark = SparkSession.builder.remote("sc://localhost").getOrCreate()


In [31]:
sc = SparkContext().getOrCreate()

RuntimeError: Remote client cannot create a SparkContext. Create SparkSession instead.

In [3]:
spark = SparkSession(sc).builder \
    .appName("dica33") \
    .getOrCreate()
    #.master("local[1]") \
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000021B645B2D10>


## Custom transformations for dica33

In [13]:
category_mapping = {'Alimentazione': "Scienza dell'alimentazione",
 'Allergie': 'Allergologia e immunologia',
 'Apparato respiratorio': 'Pneumologia',
 'Bocca e denti': 'Ortodonzia',
 'Chirurgia estetica': 'Chirurgia plastica e ricostruttiva',
 'Cuore circolazione e malattie del sangue': 'Cardiologia',
 'Diabete tiroide e ghiandole': 'Endocrinologia',
 'Farmaci e cure': 'Farmacologia',
 'Fegato': 'Epatologia',
 'Infanzia': 'Pediatria',
 'Malattie infettive': 'Malattie infettive',
 'Mente e cervello': 'Psicologia',
 'Occhio e vista': 'Oculistica',
 'Orecchie naso e gola': 'Otorinolaringoiatria',
 'Pelle': 'Dermatologia e venereologia',
 'Rene e vie urinarie': 'Urologia',
 'Salute femminile': 'Ginecologia e ostetricia',
 'Salute maschile': 'Andrologia',
 'Scheletro e Articolazioni': 'Ortopedia',
 'Sessualità': 'Psicologia',
 'Stomaco e intestino': 'Gastroenterologia e endoscopia digestiva',
 'Tumori': 'Oncologia medica'}

months_map = {
    "gennaio": "01",
    "febbraio": "02",
    "marzo": "03",
    "aprile": "04",
    "maggio": "05",
    "giugno": "06",
    "luglio": "07",
    "agosto": "08",
    "settembre": "09",
    "ottobre": "10",
    "novembre": "11",
    "dicembre": "12"
}




def convert_to_iso(date_str):
    try:
        parts = date_str.split()
        day = parts[0]
        month = months_map[parts[1].lower()]
        year = parts[2]
        date_iso = f"{year}-{month}-{day}T00:00:00Z"
        return date_iso
    except:
        return "0000-00-00T00:00:00Z"
    

def split_dataframe(df):
    df_rag = df.select(['URL', 'Category', 'Question' , 'Answer'])
    df_doctors = df.select(['Doctor profile', 'Location'])
    df_analytics = df.select(['URL', 'Category', 'Answer Date', 'Question Date'])

    return df_rag, df_doctors, df_analytics

## UDFS

In [14]:
#imported from spark_functions.py
chunking_udf = udf(lambda x: chunking(x, 300), ArrayType(StringType()))
embeddings_udf = udf(lambda x: embed(x), ArrayType(FloatType()))
locations_udf = udf(lambda x: get_coordinates(x, "IT"), ArrayType(FloatType()))

mapping_udf = udf(lambda x: map_category(x, category_mapping), StringType())
convert_to_iso_udf = udf(lambda x: convert_to_iso(x), StringType())


## Transformations generali

In [7]:
dica33_df = load_dataframe(spark, '../dica33/data')
dica33_df = dica33_df.dropDuplicates(['URL'])
dica33_df = dica33_df.dropDuplicates(['Question'])
#mapping della categoria di dica33 sulle categorie predefinite
dica33_df = dica33_df.withColumn('Category', mapping_udf(dica33_df['Category']))


../dica33/data loaded


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/opt/spark/dica33/data.

## Splitting

In [7]:
#splitta le informazioni per ogni database
df_rag, df_doctors, df_analytics = split_dataframe(dica33_df)

## Transformations di df_rag

In [13]:
df_rag = df_rag.where((length(col('Question')) > 30) & (length(col('Answer')) > 30))
df_rag = df_rag.na.drop(how='any', subset=['Question', 'Answer'])

In [14]:
df_rag = df_rag.withColumn('Question', chunking_udf(df_rag['Question']))
df_rag = df_rag.select('*', posexplode('Question').alias('Chunk_number', 'Chunked_Question'))
df_rag = df_rag.drop('Question')
df_rag = df_rag.withColumnRenamed('Chunked_Question', 'Question')
#df_rag = df_rag.withColumn('embeddings', embeddings_udf(df_rag['Question']))

## Transformations di df_analytics

In [8]:
df_analytics = df_analytics.where(~(col('Category').contains('MISSING')))
df_analytics = df_analytics.na.drop(how='any', subset=['Category'])
df_analytics = df_analytics.withColumn('Question Date', convert_to_iso_udf(df_analytics['Question Date']))
df_analytics = df_analytics.withColumn('Answer Date', convert_to_iso_udf(df_analytics['Answer Date']))

## Transformations di df_doctors

In [11]:
df_doctors = df_doctors.dropDuplicates(['Doctor profile'])
df_doctors = df_doctors.na.drop(how='any', subset=['Doctor profile', 'Location'])
df_doctors = df_doctors.groupby(lower(col('location')).alias('location')).agg(collect_list(col("Doctor profile")).alias('Doctor profile'))
df_doctors = df_doctors.withColumn('coordinates', locations_udf(df_doctors['location']))
df_doctors = df_doctors.where(col('coordinates').getItem(0) != 0.0)
df_doctors = df_doctors.withColumn('Doctor profile', explode('Doctor profile'))

In [10]:
df_doctors = df_doctors.dropDuplicates(['Doctor profile'])
df_doctors = df_doctors.na.drop(how='any', subset=['Doctor profile', 'Location'])

## Salvataggio

In [12]:
df_rag.write.json('../dica33/json_dica33/rag', mode="overwrite")
df_analytics.write.json('../dica33/json_dica33/analytics', mode="overwrite")
df_doctors.write.json('../dica33/json_dica33/doctors', mode="overwrite")