# Spark with Python: pyspark

PySpark is a high-level API that allows using the Spark framework with Python language. [Spark documentation](https://spark.apache.org/docs/latest/api/python/index.html#)

The first cell loads the libraries we will need along this notebook.

In [1]:
import findspark
import pandas as pd
import pyspark
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, isnan, udf
from pyspark.sql.types import DateType, IntegerType

First, we have to point out where in our machine Spark is installed: we do this with `findspark.init()`.

Then, we instanciate an object to initialize a Spark session, using a builder where we define a name for our application: `.appName()`. 

It is possible to create a new a session or use one that already exists, although it will update the configuration: 

In [2]:
findspark.init("/usr/local/spark")

spark = SparkSession \
            .builder \
            .appName("Learning Spark with Python") \
            .getOrCreate() # se já existe uma Session, ela será atualizada com as configurações acima. Caso contrário, irá criar uma nova sessão.

To visualize the parameters of our session:

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '36853'),
 ('spark.driver.host', '192.168.0.10'),
 ('spark.app.id', 'local-1587753245018'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Learning Spark with Python'),
 ('spark.ui.showConsoleProgress', 'true')]

Let's use a dataset with csv format ([available here](https://www.kaggle.com/joniarroba/noshowappointments))

Specify the parameter to indicate where the dataset is, then read it and save as a dataframe:

In [4]:
path = 'data/noshowappointments-kagglev2-may-2016.csv'

In [5]:
noshow_dataset = spark.read.csv(path, header = True)

The method `printSchema()` returns column names and data types:

In [6]:
noshow_dataset.printSchema()

root
 |-- PatientId: string (nullable = true)
 |-- AppointmentID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ScheduledDay: string (nullable = true)
 |-- AppointmentDay: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Neighbourhood: string (nullable = true)
 |-- Scholarship: string (nullable = true)
 |-- Hipertension: string (nullable = true)
 |-- Diabetes: string (nullable = true)
 |-- Alcoholism: string (nullable = true)
 |-- Handcap: string (nullable = true)
 |-- SMS_received: string (nullable = true)
 |-- No-show: string (nullable = true)



To visualizes a sample of the dataset, you can use `show()` or `take()` and define how many lines, you want to see:

In [7]:
noshow_dataset.show(2)

+---------------+-------------+------+--------------------+--------------------+---+---------------+-----------+------------+--------+----------+-------+------------+-------+
|      PatientId|AppointmentID|Gender|        ScheduledDay|      AppointmentDay|Age|  Neighbourhood|Scholarship|Hipertension|Diabetes|Alcoholism|Handcap|SMS_received|No-show|
+---------------+-------------+------+--------------------+--------------------+---+---------------+-----------+------------+--------+----------+-------+------------+-------+
| 29872499824296|      5642903|     F|2016-04-29T18:38:08Z|2016-04-29T00:00:00Z| 62|JARDIM DA PENHA|          0|           1|       0|         0|      0|           0|     No|
|558997776694438|      5642503|     M|2016-04-29T16:08:27Z|2016-04-29T00:00:00Z| 56|JARDIM DA PENHA|          0|           0|       0|         0|      0|           0|     No|
+---------------+-------------+------+--------------------+--------------------+---+---------------+-----------+------------+

In [8]:
noshow_dataset.take(2)

[Row(PatientId='29872499824296', AppointmentID='5642903', Gender='F', ScheduledDay='2016-04-29T18:38:08Z', AppointmentDay='2016-04-29T00:00:00Z', Age='62', Neighbourhood='JARDIM DA PENHA', Scholarship='0', Hipertension='1', Diabetes='0', Alcoholism='0', Handcap='0', SMS_received='0', No-show='No'),
 Row(PatientId='558997776694438', AppointmentID='5642503', Gender='M', ScheduledDay='2016-04-29T16:08:27Z', AppointmentDay='2016-04-29T00:00:00Z', Age='56', Neighbourhood='JARDIM DA PENHA', Scholarship='0', Hipertension='0', Diabetes='0', Alcoholism='0', Handcap='0', SMS_received='0', No-show='No')]

Also, it is possible to use `toPandas()` to show data as a Pandas dataframe:

In [9]:
noshow_dataset.limit(2).toPandas()

Unnamed: 0,PatientId,AppointmentID,Gender,ScheduledDay,AppointmentDay,Age,Neighbourhood,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received,No-show
0,29872499824296,5642903,F,2016-04-29T18:38:08Z,2016-04-29T00:00:00Z,62,JARDIM DA PENHA,0,1,0,0,0,0,No
1,558997776694438,5642503,M,2016-04-29T16:08:27Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,0,0,0,0,0,No


# Data Wrangling

#### Before any data analysis, we need to let our data clean and organized. This involves, for example, to verify the presence of null values or check if the values are spelled correctly. Here, in this dataset we need to deal with the following:
1. check the presence of missing or null values.
2. change the data type of columns ScheduledDay and AppointmentDay (as is: string, to: timestamp).
3. standardize the names of the neighborhood to lowercase.
4. rename the columns No-Show and SMS_received to NoShow and SMSReceived.
5. for numeric variables check values with an exploratory data analysis.


To see how many rows there is:

In [10]:
noshow_dataset.count()

110527

To drop the duplicated rows:

In [11]:
noshow_dataset = noshow_dataset.dropDuplicates()

### 1. Check the presence of missing or null values

In [12]:
def count_missing_values(dataframeName):
    column_names = dataframeName.columns
    
    my_columns = []
    missing_values_count = []
    
    for column_name in column_names:
        my_columns.append(column_name)
        missing_values_count.append(dataframeName.filter((dataframeName[column_name] == "") | dataframeName[column_name].isNull() | isnan(dataframeName[column_name])).count())
    
    df_result = pd.DataFrame({'column_name': my_columns, 'missing_values_count': missing_values_count})    
    
    return df_result

In [13]:
count_missing_values(noshow_dataset)

Unnamed: 0,column_name,missing_values_count
0,PatientId,0
1,AppointmentID,0
2,Gender,0
3,ScheduledDay,0
4,AppointmentDay,0
5,Age,0
6,Neighbourhood,0
7,Scholarship,0
8,Hipertension,0
9,Diabetes,0


how to remove or replacae when there is missing values?

In [14]:
# df = df.dropna()
# df = df.dropna(subset["coluna1", "coluna2"])

In [15]:
# # replacing na values:
# df = df.fillna(0)

### 2. Change the data type of columns ScheduledDay and AppointmentDay (as is: string, to: timestamp)

We are going to use the `udf` function, which stands for User Defined Function.

It allows you to write a function and call it latter by specifying the column you want to apply it.

Here we build a udf function to change the data type of values in columns ScheduledDay and AppointmentDay from string type to date type using an anonymous function lambda. It is necessary to specify the date format in the string and transform it with the method `datetime.strptime`. 

As the default return type for a udf is StringType, we have to change it for the data type needed with `DateType()`.



In [16]:
transform_str_to_date = udf(lambda time_column: datetime.strptime(time_column,"%Y-%m-%dT%H:%M:%SZ"), DateType())

<br> 

Now call the udf function created above and put the results of the fucntion in a new column called ScheduledDayC, created with the method `withColumn`:

`withColumn("name_of_the_new_column", udf_function(name_of_the_column_that_will_be_transformed))`

<br>

In [17]:
noshow_dataset = noshow_dataset.withColumn("ScheduledDayC", transform_str_to_date("ScheduledDay"))

Just to check if our function worked, it is possible to select only the columns I want to see with: 

`df.select(["coluna1", "coluna2"]).show()`

In [18]:
noshow_dataset.select(["ScheduledDay", "ScheduledDayC"]).show(6)

+--------------------+-------------+
|        ScheduledDay|ScheduledDayC|
+--------------------+-------------+
|2016-04-28T08:04:48Z|   2016-04-28|
|2016-04-29T12:04:30Z|   2016-04-29|
|2016-04-19T08:35:26Z|   2016-04-19|
|2016-04-27T12:49:26Z|   2016-04-27|
|2016-04-29T14:16:40Z|   2016-04-29|
|2016-04-29T14:00:37Z|   2016-04-29|
+--------------------+-------------+
only showing top 6 rows



In [19]:
noshow_dataset = noshow_dataset.withColumn("AppointmentDayC", transform_str_to_date("AppointmentDay"))

In [20]:
noshow_dataset.select(["AppointmentDay", "AppointmentDayC"]).show(3)

+--------------------+---------------+
|      AppointmentDay|AppointmentDayC|
+--------------------+---------------+
|2016-04-29T00:00:00Z|     2016-04-29|
|2016-04-29T00:00:00Z|     2016-04-29|
|2016-04-29T00:00:00Z|     2016-04-29|
+--------------------+---------------+
only showing top 3 rows



In [21]:
noshow_dataset.printSchema()

root
 |-- PatientId: string (nullable = true)
 |-- AppointmentID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ScheduledDay: string (nullable = true)
 |-- AppointmentDay: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Neighbourhood: string (nullable = true)
 |-- Scholarship: string (nullable = true)
 |-- Hipertension: string (nullable = true)
 |-- Diabetes: string (nullable = true)
 |-- Alcoholism: string (nullable = true)
 |-- Handcap: string (nullable = true)
 |-- SMS_received: string (nullable = true)
 |-- No-show: string (nullable = true)
 |-- ScheduledDayC: date (nullable = true)
 |-- AppointmentDayC: date (nullable = true)



### 3. Standardize the names of the neighborhood to lowercase

In [22]:
neighbourhood_lowercase = udf(lambda x: x.lower())

In [23]:
noshow_dataset = noshow_dataset.withColumn("NeighbourhoodLC", neighbourhood_lowercase(noshow_dataset.Neighbourhood))

In [24]:
noshow_dataset.take(2)

[Row(PatientId='3326985214249', AppointmentID='5632495', Gender='F', ScheduledDay='2016-04-28T08:04:48Z', AppointmentDay='2016-04-29T00:00:00Z', Age='0', Neighbourhood='SANTO ANTÔNIO', Scholarship='0', Hipertension='0', Diabetes='0', Alcoholism='0', Handcap='0', SMS_received='0', No-show='Yes', ScheduledDayC=datetime.date(2016, 4, 28), AppointmentDayC=datetime.date(2016, 4, 29), NeighbourhoodLC='santo antônio'),
 Row(PatientId='4755762696573', AppointmentID='5641034', Gender='F', ScheduledDay='2016-04-29T12:04:30Z', AppointmentDay='2016-04-29T00:00:00Z', Age='36', Neighbourhood='MARIA ORTIZ', Scholarship='0', Hipertension='0', Diabetes='0', Alcoholism='0', Handcap='0', SMS_received='0', No-show='No', ScheduledDayC=datetime.date(2016, 4, 29), AppointmentDayC=datetime.date(2016, 4, 29), NeighbourhoodLC='maria ortiz')]

### 4. Rename the columns No-Show and SMS_received to NoShow and SMSReceived

To rename a column we can use `withColumnRenamed()`

In [25]:
noshow_dataset = noshow_dataset.withColumnRenamed("No-Show", "NoShow") \
                                .withColumnRenamed("SMS_received", "SMSReceived")

In [26]:
noshow_dataset.show(2)

+-------------+-------------+------+--------------------+--------------------+---+-------------+-----------+------------+--------+----------+-------+-----------+------+-------------+---------------+---------------+
|    PatientId|AppointmentID|Gender|        ScheduledDay|      AppointmentDay|Age|Neighbourhood|Scholarship|Hipertension|Diabetes|Alcoholism|Handcap|SMSReceived|NoShow|ScheduledDayC|AppointmentDayC|NeighbourhoodLC|
+-------------+-------------+------+--------------------+--------------------+---+-------------+-----------+------------+--------+----------+-------+-----------+------+-------------+---------------+---------------+
|3326985214249|      5632495|     F|2016-04-28T08:04:48Z|2016-04-29T00:00:00Z|  0|SANTO ANTÔNIO|          0|           0|       0|         0|      0|          0|   Yes|   2016-04-28|     2016-04-29|  santo antônio|
|4755762696573|      5641034|     F|2016-04-29T12:04:30Z|2016-04-29T00:00:00Z| 36|  MARIA ORTIZ|          0|           0|       0|         0

### 5. For numeric variables check values with an exploratory data analysis

Use `withColumn` and `cast` to change data type:

In [27]:
noshow_dataset = noshow_dataset.withColumn("Age", col("Age").cast(IntegerType()))

In [28]:
noshow_dataset.printSchema()

root
 |-- PatientId: string (nullable = true)
 |-- AppointmentID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ScheduledDay: string (nullable = true)
 |-- AppointmentDay: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Neighbourhood: string (nullable = true)
 |-- Scholarship: string (nullable = true)
 |-- Hipertension: string (nullable = true)
 |-- Diabetes: string (nullable = true)
 |-- Alcoholism: string (nullable = true)
 |-- Handcap: string (nullable = true)
 |-- SMSReceived: string (nullable = true)
 |-- NoShow: string (nullable = true)
 |-- ScheduledDayC: date (nullable = true)
 |-- AppointmentDayC: date (nullable = true)
 |-- NeighbourhoodLC: string (nullable = true)



Use `describe()` similar to Pandas to get descriptive statistics:

In [29]:
noshow_dataset.describe("Age").show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|            110527|
|   mean| 37.08887421173107|
| stddev|23.110204963682634|
|    min|                -1|
|    max|               115|
+-------+------------------+



We can see there is negative values for age. We are assuming age is represented in years. 

To see these records:

In [30]:
noshow_dataset.select(["PatientId", "AppointmentId", "Age"]).where(noshow_dataset.Age == -1).collect()

[Row(PatientId='465943158731293', AppointmentId='5775010', Age=-1)]

We can observe that there is only one record that assumes a negative age, which is probably incorrect. 

To remove it we can use filter:

In [31]:
noshow_dataset = noshow_dataset.filter(noshow_dataset.Age >= 0)

In [32]:
noshow_dataset.describe("Age").show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|            110526|
|   mean|37.089218826339504|
| stddev|23.110025518540056|
|    min|                 0|
|    max|               115|
+-------+------------------+



# Exploratory Data Analysis

Now that we cleaned and wrangled our dataset, we are good to go to perform some exploratory data analysis.

- Frequency distribution of show and no-show.
- No show according to the age.
- Which neighbourhoods have higher no-show occurence?
- No shows by month

### Frequency distribution of show and no-show

In [33]:
# how many songs users are listening to in a particular hour:
# 3. contar o NextSong in the page and group by the hour:

noshow_frequency = noshow_dataset.groupby(noshow_dataset.NoShow).count()
noshow_frequency.show()

+------+-----+
|NoShow|count|
+------+-----+
|    No|88207|
|   Yes|22319|
+------+-----+



### No show according to the age

Below we are agrupating the data by noshow classes: Yes and No, using `groupby` and an aggregation funtion `agg(F.avg())` to calculate the mean age, and `alias` to define column name:

In [34]:
noshow_age = noshow_dataset.groupby(noshow_dataset.NoShow).agg(F.avg(noshow_dataset.Age).alias("MeanAge"))

In [35]:
noshow_age.show()

+------+------------------+
|NoShow|           MeanAge|
+------+------------------+
|    No|37.790504154999034|
|   Yes| 34.31766656212196|
+------+------------------+



### Which neighbourhoods have higher no-show occurence?


In [36]:
noshow_neighbourhood = noshow_dataset.filter(noshow_dataset.NoShow == "Yes") \
                                                        .groupby(noshow_dataset.NeighbourhoodLC) \
                                                        .count() \
                                                        .sort(desc("count"))

In [37]:
noshow_neighbourhood.show()

+-----------------+-----+
|  NeighbourhoodLC|count|
+-----------------+-----+
|   jardim camburi| 1465|
|      maria ortiz| 1219|
|          itararé|  923|
|      resistência|  906|
|           centro|  703|
|jesus de nazareth|  696|
|  jardim da penha|  631|
|        caratoíra|  591|
|       tabuazeiro|  573|
|           bonfim|  550|
| ilha do príncipe|  532|
|       andorinhas|  521|
|        são pedro|  515|
|      santo andré|  508|
|     santa martha|  496|
|    santo antônio|  484|
|            romão|  474|
|         gurigica|  456|
|           jabour|  451|
|         da penha|  429|
+-----------------+-----+
only showing top 20 rows



### No shows by month

In [38]:
get_month = udf(lambda time_column: time_column.month, IntegerType())

In [39]:
noshow_dataset = noshow_dataset.withColumn("MonthAppointment", get_month(noshow_dataset.AppointmentDayC))

In [40]:
noshow_dataset.take(2)

[Row(PatientId='3326985214249', AppointmentID='5632495', Gender='F', ScheduledDay='2016-04-28T08:04:48Z', AppointmentDay='2016-04-29T00:00:00Z', Age=0, Neighbourhood='SANTO ANTÔNIO', Scholarship='0', Hipertension='0', Diabetes='0', Alcoholism='0', Handcap='0', SMSReceived='0', NoShow='Yes', ScheduledDayC=datetime.date(2016, 4, 28), AppointmentDayC=datetime.date(2016, 4, 29), NeighbourhoodLC='santo antônio', MonthAppointment=4),
 Row(PatientId='4755762696573', AppointmentID='5641034', Gender='F', ScheduledDay='2016-04-29T12:04:30Z', AppointmentDay='2016-04-29T00:00:00Z', Age=36, Neighbourhood='MARIA ORTIZ', Scholarship='0', Hipertension='0', Diabetes='0', Alcoholism='0', Handcap='0', SMSReceived='0', NoShow='No', ScheduledDayC=datetime.date(2016, 4, 29), AppointmentDayC=datetime.date(2016, 4, 29), NeighbourhoodLC='maria ortiz', MonthAppointment=4)]

In [41]:
noshow_by_month = noshow_dataset.filter(noshow_dataset.NoShow == "Yes") \
                                                        .groupby(noshow_dataset.MonthAppointment) \
                                                        .count()

In [42]:
noshow_by_month.show()

+----------------+-----+
|MonthAppointment|count|
+----------------+-----+
|               6| 4882|
|               5|16804|
|               4|  633|
+----------------+-----+

