# Pyspark + Python

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
# create a SparkSession:
spark = SparkSession.builder.master('local').getOrCreate()

In [8]:
# Reading the data and save as Dataframe
spark_df = spark.read.csv('forestfires.csv', header='true', inferSchema='true')
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [11]:
spark_df

DataFrame[X: int, Y: int, month: string, day: string, FFMC: double, DMC: double, DC: double, ISI: double, temp: double, RH: int, wind: double, rain: double, area: double]

In [12]:
spark_df.columns

['X',
 'Y',
 'month',
 'day',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'area']

In [13]:
spark_df[['month','RH','rain']].show()

+-----+---+----+
|month| RH|rain|
+-----+---+----+
|  mar| 51| 0.0|
|  oct| 33| 0.0|
|  oct| 33| 0.0|
|  mar| 97| 0.2|
|  mar| 99| 0.0|
|  aug| 29| 0.0|
|  aug| 27| 0.0|
|  aug| 86| 0.0|
|  sep| 63| 0.0|
|  sep| 40| 0.0|
|  sep| 51| 0.0|
|  sep| 38| 0.0|
|  aug| 72| 0.0|
|  sep| 42| 0.0|
|  sep| 21| 0.0|
|  sep| 44| 0.0|
|  mar| 27| 0.0|
|  oct| 47| 0.0|
|  mar| 35| 0.0|
|  apr| 44| 0.0|
+-----+---+----+
only showing top 20 rows



In [15]:
# Selecting one column is different to pandas
spark_df.select('rain')

DataFrame[rain: double]

In [16]:
spark_df.dtypes

[('X', 'int'),
 ('Y', 'int'),
 ('month', 'string'),
 ('day', 'string'),
 ('FFMC', 'double'),
 ('DMC', 'double'),
 ('DC', 'double'),
 ('ISI', 'double'),
 ('temp', 'double'),
 ('RH', 'int'),
 ('wind', 'double'),
 ('rain', 'double'),
 ('area', 'double')]

In [17]:
# Correlation
# Put all the month together (groupby), then give me the mean of the column area 
spark_df_months = spark_df.groupBy('month').agg({'area': 'mean'})
spark_df_months

DataFrame[month: string, avg(area): double]

OBS! Notice how the grouped DataFrame is not returned when you call the aggregation method. Remember, this is still Spark! The transformations and actions are kept separate so that it is easier to manage large quantities of data. You can perform the transformation by calling .collect():

In [18]:
spark_df_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

In [19]:
spark_df_months

DataFrame[month: string, avg(area): double]

## Boolean Masking

Boolean masking also works with PySpark DataFrames just like Pandas DataFrames, the only difference being that the .filter() method is used in PySpark.

In [20]:
no_rain = spark_df.filter(spark_df['rain'] == 0.0)
some_rain = spark_df.filter(spark_df['rain'] > 0.0)

To perform calculations to find the mean of a column, we'll have to import functions from pyspark.sql. As always, to read more about them, check out the documentation.

In [21]:
from pyspark.sql.functions import mean

print('no rain fire area: ', no_rain.select(mean('area')).show(),'\n')

print('some rain fire area: ', some_rain.select(mean('area')).show(),'\n')

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+

no rain fire area:  None 

+---------+
|avg(area)|
+---------+
|  1.62375|
+---------+

some rain fire area:  None 



In [22]:
no_rain.select(mean('area')).show()

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+



Obtain data from only the summer months in Portugal (June, July, and August). We can also do the same for the winter months in Portugal (December, January, February).

In [23]:
summer_months = spark_df.filter(spark_df['month'].isin(['jun','jul','aug']))
winter_months = spark_df.filter(spark_df['month'].isin(['dec','jan','feb']))

print('summer months fire area', summer_months.select(mean('area')).show())
print('winter months fire areas', winter_months.select(mean('area')).show())

+------------------+
|         avg(area)|
+------------------+
|12.262317596566525|
+------------------+

summer months fire area None
+-----------------+
|        avg(area)|
+-----------------+
|7.918387096774193|
+-----------------+

winter months fire areas None


## SQL

SELECT CASE
       WHEN EDUCATION = '0' THEN 'Other'
       WHEN EDUCATION = '5' THEN 'Other'
       WHEN EDUCATION = '6' THEN 'Other'
       ELSE EDUCATION
       END AS EDUCATION
  FROM credit_card_default;

In [26]:
from pyspark.sql import functions as F

In [27]:
# Here I define which month is in every season

df_months_binned = spark_df.withColumn('month',
                                         F.when(spark_df['month'] == 'jun', 'Summer') \
                                         .when(spark_df['month'] == 'jul', 'Summer') \
                                         .when(spark_df['month'] == 'aug', 'Summer') \
                                         .when(spark_df['month'] == 'jan', 'Winter') \
                                         .when(spark_df['month'] == 'feb', 'Winter') \
                                         .when(spark_df['month'] == 'dec', 'Winter') \
                                         .otherwise('Spring/Fall')
                                        )

In [28]:
# Select both 'month' and 'area' before grouping and aggregating

result = df_months_binned.select('month', 'area').groupBy('month').agg({'area': 'mean'}).distinct()
result.show() # To display the results

+-----------+------------------+
|      month|         avg(area)|
+-----------+------------------+
|     Summer|12.262317596566525|
|Spring/Fall|13.989960474308296|
|     Winter| 7.918387096774193|
+-----------+------------------+

