### This script shows how we use the datatype autodetection of Pandas to create a schema for Spark Dataframes. Subsequently we used Spark to write our files to Parquet formats.

In [1]:
import pyspark
import os
import sys
import pandas as pd
from pyspark.sql import SparkSession


os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [11]:
appointment_pd = pd.read_csv('../data/noshowappointment.csv', nrows=1000)

In [12]:
appointment_pd.head()

Unnamed: 0,PatientId,AppointmentID,Gender,ScheduledDay,AppointmentDay,Age,Neighbourhood,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received,No-show
0,29872499824296,5642903,F,2016-04-29T18:38:08Z,2016-04-29T00:00:00Z,62,JARDIM DA PENHA,0,1,0,0,0,0,No
1,558997776694438,5642503,M,2016-04-29T16:08:27Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,0,0,0,0,0,No
2,4262962299951,5642549,F,2016-04-29T16:19:04Z,2016-04-29T00:00:00Z,62,MATA DA PRAIA,0,0,0,0,0,0,No
3,867951213174,5642828,F,2016-04-29T17:29:31Z,2016-04-29T00:00:00Z,8,PONTAL DE CAMBURI,0,0,0,0,0,0,No
4,8841186448183,5642494,F,2016-04-29T16:07:23Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,1,1,0,0,0,No


In [35]:
appointment_pd.describe()

Unnamed: 0,PatientId,AppointmentID,Age,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0
mean,140290200000000.0,5586122.0,37.888,0.092,0.224,0.098,0.021,0.021,0.431
std,245062900000000.0,68470.01,24.367709,0.289171,0.417131,0.297463,0.143456,0.150272,0.495464
min,318385300.0,5217179.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,4216810000000.0,5542643.0,18.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,28163010000000.0,5617144.0,36.5,0.0,0.0,0.0,0.0,0.0,0.0
75%,93981690000000.0,5638446.0,58.0,0.0,0.0,0.0,0.0,0.0,1.0
max,996868400000000.0,5642904.0,98.0,1.0,1.0,1.0,1.0,2.0,1.0


In [13]:
appointment_pd.dtypes

PatientId          int64
AppointmentID      int64
Gender            object
ScheduledDay      object
AppointmentDay    object
Age                int64
Neighbourhood     object
Scholarship        int64
Hipertension       int64
Diabetes           int64
Alcoholism         int64
Handcap            int64
SMS_received       int64
No-show           object
dtype: object

In [14]:
from pyspark.sql import types

In [44]:
initial_schema = types.StructType([
    types.StructField("PatientId", types.StringType(), True),
    types.StructField("AppointmentID", types.StringType(), True),
    types.StructField("Gender", types.StringType(), True),
    types.StructField("ScheduledDay", types.StringType(), True),
    types.StructField("AppointmentDay", types.StringType(), True),  
    types.StructField("Age", types.IntegerType(), True),
    types.StructField("Neighbourhood", types.StringType(), True),
    types.StructField("Scholarship", types.IntegerType(), True),
    types.StructField("Hipertension", types.IntegerType(), True),
    types.StructField("Diabetes", types.IntegerType(), True),
    types.StructField("Alcoholism", types.IntegerType(), True),
    types.StructField("Handcap", types.IntegerType(), True),
    types.StructField("SMS_received", types.IntegerType(), True),
    types.StructField("No-show", types.StringType(), True) 
])

In [45]:
df_appointment = spark.read \
        .option("header", "true") \
        .schema(initial_schema) \
        .csv('../data/noshowappointment.csv')

df_appointment = df_appointment \
    .withColumnRenamed('Hipertension', 'Hypertension') \
    .withColumnRenamed('Handcap', 'Handicap') \
    .withColumnRenamed('No-show', 'No_show')


In [46]:
df_appointment.registerTempTable('raw_temp')

In [50]:
df_result = spark.sql("""
SELECT 
    -- date column cleaning
    LEFT(ScheduledDay,10) AS ScheduledDay,
    LEFT(AppointmentDay,10) AS AppointmentDay,

    -- Other Columns 
    PatientId,
    AppointmentID,
    Gender,
    Age,
    Neighbourhood,
    Scholarship,
    Hypertension,
    Diabetes,
    Alcoholism,
    SMS_received,
    No_show
    
FROM
    raw_temp
    --filtering age column
WHERE Age > 0
""")

In [52]:
df_result.coalesce(1).write.parquet('../capstone/cleaned_data', mode='overwrite')

In [None]:
#uploads data to GCS via CLI
##gsutil -m cp -r capstone/ gs://dtc_data_lake_khunmi-academy-376002/capstone