# Pyspark 

### Food

##### i.Extract:  Load the data
   - Read data as csv via spark dataframe

##### ii.Transform: Exploratory data analysis using spark df
    - Unique records count
    - select columns GPA	Gender	breakfast	calories_chicken	calories_day	calories_scone	coffee type_sports
    - handling nan -> replace with 0 for numerical columns and Others in categorical columns
    - Replace - Gender -> 1 to M and 2 to F
    - Update : Normalize GPA column - normalized  v = v - min(GPA)/max(GPA)-min(GPA)
    - show schema
    - show all df
    - GroupBy type_sport and sum of cofee

##### iii.Load: Save analysis report
    - show all df, save as files

In [31]:
# loading pyspark environments

from random import random
import os
from pyspark.sql import SparkSession

In [32]:
# importing spark session

spark_session = SparkSession.builder.master("local").\
        appName("SparkApplication").\
        config("spark.driver.bindAddress","localhost").\
        config("spark.ui.port","4041").\
        getOrCreate()
sc = spark_session.sparkContext

In [33]:
# loading dataset using spark dataframe:

data = spark_session.read.load("C:/Users/AMAL JOSEPH/Downloads/food_coded.csv",format="csv", sep=",", inferSchema="true", header="true")

In [34]:
# viewing dtypes 
data.printSchema()

root
 |-- GPA: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- breakfast: string (nullable = true)
 |-- calories_chicken: string (nullable = true)
 |-- calories_day: string (nullable = true)
 |-- calories_scone: string (nullable = true)
 |-- coffee: string (nullable = true)
 |-- comfort_food: string (nullable = true)
 |-- comfort_food_reasons: string (nullable = true)
 |-- comfort_food_reasons_coded9: string (nullable = true)
 |-- cook: string (nullable = true)
 |-- comfort_food_reasons_coded11: integer (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- diet_current: string (nullable = true)
 |-- diet_current_coded: string (nullable = true)
 |-- drink: string (nullable = true)
 |-- eating_changes: string (nullable = true)
 |-- eating_changes_coded: string (nullable = true)
 |-- eating_changes_coded1: string (nullable = true)
 |-- eating_out: string (nullable = true)
 |-- employment: string (nullable = true)
 |-- ethnic_food: string (nullable = true)
 |-- 

#### Transform:


##### Unique records count

In [35]:
print("Count: " + str(data.distinct().count()))

Count: 138


In [36]:
from pyspark.sql.functions import col, countDistinct

data.agg(*(countDistinct(col(c)).alias(c) for c in data.columns))

DataFrame[GPA: bigint, Gender: bigint, breakfast: bigint, calories_chicken: bigint, calories_day: bigint, calories_scone: bigint, coffee: bigint, comfort_food: bigint, comfort_food_reasons: bigint, comfort_food_reasons_coded9: bigint, cook: bigint, comfort_food_reasons_coded11: bigint, cuisine: bigint, diet_current: bigint, diet_current_coded: bigint, drink: bigint, eating_changes: bigint, eating_changes_coded: bigint, eating_changes_coded1: bigint, eating_out: bigint, employment: bigint, ethnic_food: bigint, exercise: bigint, father_education: bigint, father_profession: bigint, fav_cuisine: bigint, fav_cuisine_coded: bigint, fav_food: bigint, food_childhood: bigint, fries: bigint, fruit_day: bigint, grade_level: bigint, greek_food: bigint, healthy_feeling: bigint, healthy_meal: bigint, ideal_diet: bigint, ideal_diet_coded: bigint, income: bigint, indian_food: bigint, italian_food: bigint, life_rewarding: bigint, marital_status: bigint, meals_dinner_friend: bigint, mother_education: bi

In [37]:
data.distinct().show()

+--------------------+--------------------+--------------------+----------------+------------+--------------+------+--------------------+--------------------+---------------------------+------------------+----------------------------+-------+--------------------+------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+-----------+--------+----------------+--------------------+--------------------+-----------------+--------------------+--------------------+----------------+---------+-----------+----------+---------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+--------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+------------+------------+------------+----------------------+----+------+---------+-----------------+---------------+--------------------

In [38]:
data.dtypes

[('GPA', 'string'),
 ('Gender', 'string'),
 ('breakfast', 'string'),
 ('calories_chicken', 'string'),
 ('calories_day', 'string'),
 ('calories_scone', 'string'),
 ('coffee', 'string'),
 ('comfort_food', 'string'),
 ('comfort_food_reasons', 'string'),
 ('comfort_food_reasons_coded9', 'string'),
 ('cook', 'string'),
 ('comfort_food_reasons_coded11', 'int'),
 ('cuisine', 'string'),
 ('diet_current', 'string'),
 ('diet_current_coded', 'string'),
 ('drink', 'string'),
 ('eating_changes', 'string'),
 ('eating_changes_coded', 'string'),
 ('eating_changes_coded1', 'string'),
 ('eating_out', 'string'),
 ('employment', 'string'),
 ('ethnic_food', 'string'),
 ('exercise', 'string'),
 ('father_education', 'string'),
 ('father_profession', 'string'),
 ('fav_cuisine', 'string'),
 ('fav_cuisine_coded', 'string'),
 ('fav_food', 'string'),
 ('food_childhood', 'string'),
 ('fries', 'string'),
 ('fruit_day', 'string'),
 ('grade_level', 'int'),
 ('greek_food', 'int'),
 ('healthy_feeling', 'int'),
 ('healt

####  - select columns :GPA	,Gender	,breakfast,	calories_chicken,calories_day	calories_scone,coffee type_sports

In [39]:
#seleting specific columns :

df = data.select('GPA', 'Gender','breakfast', 'calories_chicken', 'calories_day', 'calories_scone' ,'coffee', 'type_sports')
df.show()

+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            GPA|Gender|           breakfast|calories_chicken|calories_day|calories_scone|coffee|         type_sports|
+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            2.4|     2|                   1|             430|         nan|           315|     1|          car racing|
|          3.654|     1|                   1|             610|           3|           420|     2|         Basketball |
|            3.3|     1|                   1|             720|           4|           420|     2|                 500|
|            3.2|     1|                   1|             430|           3|           420|     2|                null|
| Stuffed Shells|  null|                null|            null|        null|          null|  null|                null|
|Homemade Chili"|     4|Special Education...|   

#### Handling missing nan -> replace with 0 for numerical columns and Others in categorical columns

In [40]:
# Viewing the no of null values:

from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---------+----------------+------------+--------------+------+-----------+
|GPA|Gender|breakfast|calories_chicken|calories_day|calories_scone|coffee|type_sports|
+---+------+---------+----------------+------------+--------------+------+-----------+
|  2|     5|        6|               5|          24|             7|     6|         39|
+---+------+---------+----------------+------------+--------------+------+-----------+



In [41]:
# Replacing NAN value with '0':

import pyspark.sql.functions as F
columns = df.columns
for column in columns:
    df = df.withColumn(column,F.when(F.isnan(F.col(column)),0).otherwise(F.col(column)))


In [42]:
# Viewing the data after replacing the values:

print(df.show())

+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            GPA|Gender|           breakfast|calories_chicken|calories_day|calories_scone|coffee|         type_sports|
+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            2.4|     2|                   1|             430|           0|           315|     1|          car racing|
|          3.654|     1|                   1|             610|           3|           420|     2|         Basketball |
|            3.3|     1|                   1|             720|           4|           420|     2|                 500|
|            3.2|     1|                   1|             430|           3|           420|     2|                null|
| Stuffed Shells|  null|                null|            null|        null|          null|  null|                null|
|Homemade Chili"|     4|Special Education...|   

 #### Replace - Gender -> 1 to M and 2 to F

In [43]:
# Repacing the values:

from pyspark.sql.functions import regexp_replace, when
df = df.withColumn('Gender', regexp_replace('Gender', '1', 'M'))
df = df.withColumn('Gender', regexp_replace('Gender', '2', 'F'))
df.show()

+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            GPA|Gender|           breakfast|calories_chicken|calories_day|calories_scone|coffee|         type_sports|
+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+
|            2.4|     F|                   1|             430|           0|           315|     1|          car racing|
|          3.654|     M|                   1|             610|           3|           420|     2|         Basketball |
|            3.3|     M|                   1|             720|           4|           420|     2|                 500|
|            3.2|     M|                   1|             430|           3|           420|     2|                null|
| Stuffed Shells|  null|                null|            null|        null|          null|  null|                null|
|Homemade Chili"|     4|Special Education...|   

#### Update : Normalize GPA column - normalized  v = v - min(GPA)/max(GPA)-min(GPA)

In [44]:
# importing the values:
import os
import pyspark
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.functions import upper,pandas_udf,PandasUDFType,udf
from pyspark.sql.types import *
from pyspark.sql.functions import min, max, lit, first
from pyspark.sql import Row
from pyspark.sql.functions import col, greatest
from pyspark.sql.functions import least

In [45]:
# Adding Column 'v'
df = df.withColumn('v', df.GPA)
df.show()

+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+---------------+
|            GPA|Gender|           breakfast|calories_chicken|calories_day|calories_scone|coffee|         type_sports|              v|
+---------------+------+--------------------+----------------+------------+--------------+------+--------------------+---------------+
|            2.4|     F|                   1|             430|           0|           315|     1|          car racing|            2.4|
|          3.654|     M|                   1|             610|           3|           420|     2|         Basketball |          3.654|
|            3.3|     M|                   1|             720|           4|           420|     2|                 500|            3.3|
|            3.2|     M|                   1|             430|           3|           420|     2|                null|            3.2|
| Stuffed Shells|  null|                null|          

In [46]:
# Converting Column value to Integer
df = df.withColumn("GPA", df["GPA"].cast(LongType()))
df = df.withColumn("breakfast", df["breakfast"].cast(LongType()))
df = df.withColumn("calories_chicken", df["calories_chicken"].cast(LongType()))
df = df.withColumn("calories_day", df["calories_day"].cast(LongType()))
df = df.withColumn("calories_scone", df["calories_scone"].cast(LongType()))
df = df.withColumn("coffee", df["coffee"].cast(LongType()))
df = df.withColumn("v", df.v.cast("double"))

In [47]:
# view the datatype
df.dtypes

[('GPA', 'bigint'),
 ('Gender', 'string'),
 ('breakfast', 'bigint'),
 ('calories_chicken', 'bigint'),
 ('calories_day', 'bigint'),
 ('calories_scone', 'bigint'),
 ('coffee', 'bigint'),
 ('type_sports', 'string'),
 ('v', 'double')]

In [48]:
df.show()

+----+------+---------+----------------+------------+--------------+------+--------------------+-----+
| GPA|Gender|breakfast|calories_chicken|calories_day|calories_scone|coffee|         type_sports|    v|
+----+------+---------+----------------+------------+--------------+------+--------------------+-----+
|   2|     F|        1|             430|           0|           315|     1|          car racing|  2.4|
|   3|     M|        1|             610|           3|           420|     2|         Basketball |3.654|
|   3|     M|        1|             720|           4|           420|     2|                 500|  3.3|
|   3|     M|        1|             430|           3|           420|     2|                null|  3.2|
|null|  null|     null|            null|        null|          null|  null|                null| null|
|null|     4|     null|               2|           1|             1|     2|                null| null|
|   3|     M|        1|             720|           2|           420|     

In [49]:
# normalizing v = v - min(GPA)/max(GPA)-min(GPA)
@pandas_udf("GPA long,Gender string,breakfast long,calories_chicken long,calories_day long,calories_scone long,coffee long,type_sports string, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
def normalize(df):
    v = df.v
    GPA = df.GPA
    return df.assign(v=(v - min(GPA)) / max(GPA)-min(GPA))
df = df.groupby("GPA").apply(normalize)

####   - Show schema of Data

In [51]:
# Schema
df.printSchema()

root
 |-- GPA: long (nullable = true)
 |-- Gender: string (nullable = true)
 |-- breakfast: long (nullable = true)
 |-- calories_chicken: long (nullable = true)
 |-- calories_day: long (nullable = true)
 |-- calories_scone: long (nullable = true)
 |-- coffee: long (nullable = true)
 |-- type_sports: string (nullable = true)
 |-- v: double (nullable = true)



#### - Showing all dataframe

In [None]:
# showing dataframe
df.show()

+----+------+---------+----------------+------------+--------------+------+------------------+------+
| GPA|Gender|breakfast|calories_chicken|calories_day|calories_scone|coffee|       type_sports|     v|
+----+------+---------+----------------+------------+--------------+------+------------------+------+
|null|     M|        1|             610|           2|           980|     2|                 0|  null|
|null|     F|        1|             720|           4|           420|     2|          baseball|  null|
|null|     M|        1|             720|           3|           420|     2|None at the moment|  null|
|   0|     M|        1|             610|           4|           420|     2|                 0|  null|
|   0|     F|        2|             430|           0|           980|     2|            Hockey|  null|
|   2|     F|        1|             430|           0|           315|     1|        car racing|  -1.8|
|   2|     M|        1|             610|           3|           980|     2|       

#### - GroupBy type_sport and sum of cofee

In [None]:
#Grouping the data value coffee by type_sports
df.groupBy('type_sports').agg({'coffee': 'sum'}).collect()

[Row(type_sports='hockey', sum(coffee)=4),
 Row(type_sports='Rec Volleyball', sum(coffee)=2),
 Row(type_sports='Tennis', sum(coffee)=2),
 Row(type_sports='Ice Hockey', sum(coffee)=2),
 Row(type_sports=' None', sum(coffee)=2),
 Row(type_sports='tennis soccer gym', sum(coffee)=1),
 Row(type_sports='Volleyball, Track', sum(coffee)=2),
 Row(type_sports='none organized', sum(coffee)=2),
 Row(type_sports='Softball', sum(coffee)=6),
 Row(type_sports='None', sum(coffee)=8),
 Row(type_sports='horse back riding', sum(coffee)=1),
 Row(type_sports='none', sum(coffee)=13),
 Row(type_sports='0', sum(coffee)=38),
 Row(type_sports='basketball ', sum(coffee)=2),
 Row(type_sports='Running ', sum(coffee)=2),
 Row(type_sports='wrestling ', sum(coffee)=1),
 Row(type_sports='dancing ', sum(coffee)=1),
 Row(type_sports='None right now', sum(coffee)=2),
 Row(type_sports='Marching Band', sum(coffee)=2),
 Row(type_sports='Basketball ', sum(coffee)=3),
 Row(type_sports='car racing', sum(coffee)=1),
 Row(type_spo

### iii.Load: Save analysis report

In [None]:
# Saving the file into CSV
df.write.format('csv').option('header',True).mode('overwrite').save('result.csv')