### Setup

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


### Loading Data 

In [None]:
df = spark.read.format('csv').option('header', 'true').load('/content/drive/MyDrive/original.csv')
df.show()

In [None]:
df = spark.read.csv('/content/drive/MyDrive/original.csv', header=True)
df.show()

In [None]:
df.dtypes

In [None]:
from pyspark.sql.types import *
schema = StructType([
  StructField('id', IntegerType()),
  StructField('first_name', StringType()),
  StructField('last_name', StringType()),
  StructField('gender', StringType()),
  StructField('city',  StringType()),
  StructField('job_title', StringType()),
  StructField('Salary', StringType()),
  StructField('latitude', FloatType()),
  StructField('longitude', FloatType())
])

df = spark.read.csv('/content/drive/MyDrive/original.csv', header=True,schema = schema)
df.show()

### Inspecting Data

In [None]:
df.dtypes

In [None]:
df.head(6)

In [None]:
df.first()

In [None]:
df.describe().show()

In [None]:
df.columns

In [None]:
df.count()

In [None]:
df.distinct().count()

###  Handling Null and Duplicate Values

In [None]:
df_dropped = df.na.drop()
df_dropped.show()

In [None]:
df_null_jobs = df.filter(df.job_title.isNotNull())
df_null_jobs.show()

In [None]:
from pyspark.sql.functions import *
df_handled = df.withColumn('clean_city', when(df.city.isNull(), 'Unknow').otherwise(df.city))
df_handled.show()

In [None]:
df_no_duplicates = df.dropDuplicates()
df_no_duplicates.show()

### Selecting & Filtering

In [None]:
df_select = df.select('first_name', 'last_name')
df_select.show()

In [None]:
df_renamed = df.withColumnRenamed('first_name', 'fn')
df_renamed.show()

In [None]:
df_filter = df.filter((df.first_name == 'Alvera'))
df_filter.show()

In [None]:
df_filter = df.filter((df.first_name.like('%ndr%')))
df_filter.show()

In [None]:
df_filter = df.filter((df.first_name.endswith('din')))
df_filter.show()

In [None]:
df_filter = df.filter((df.first_name.startswith('Alv')))
df_filter.show()

In [None]:
df_filter = df.filter(df.id.between(1,5))
df_filter.show()

In [None]:
df_filter = df.filter(df.first_name.isin('Aldin', 'Valma'))
df_filter.show()

In [67]:
df_substring = df.select(df.first_name, df.first_name.substr(1,5).alias('name'))
df_substring.show()

+----------+-----+
|first_name| name|
+----------+-----+
|   Melinde|Melin|
|  Kimberly|Kimbe|
|    Alvera|Alver|
|   Shannon|Shann|
|  Sherwood|Sherw|
|     Maris|Maris|
|     Masha|Masha|
|   Goddart|Godda|
|      Roth| Roth|
|      Bran| Bran|
|    Kylynn|Kylyn|
|       Rey|  Rey|
|      Kerr| Kerr|
|    Mickie|Micki|
|    Kaspar|Kaspa|
|    Norbie|Norbi|
|    Claude|Claud|
|     Thain|Thain|
|  Tiffanie|Tiffa|
|    Ettore|Ettor|
+----------+-----+
only showing top 20 rows



### Multiple Filters

In [68]:
# df_filter = df.filter((df.first_name.isin('Aldin','Valma')) & (df.city.like('%ondon')))
df_filter = df.filter((df.first_name.isin('Aldin','Valma')) | (df.city.like('%Caxias')))
df_filter.show()

+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
| id|first_name|    last_name|gender|       city|       job_title|   Salary|  latitude|longitude|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
| 37|     Nicko|        Frays|  Male|     Caxias|  Health Coach I|$99786.40|-4.8654137|  -43.362|
|569|     Valma|      Bratton|Female|  Kurayoshi|Web Developer II|$32665.89| 35.449905|133.76134|
|901|     Aldin|Matuszkiewicz|  Male|East London|        Operator|$41468.83|-32.954933|27.931913|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+



In [69]:
# df_filter = df.filter((df.id > 10) & (df.id < 18))
df_filter = df.filter((df.id > 10) & (df.id <= 18))
df_filter.show()

+---+----------+---------+------+--------------+--------------------+---------+---------+----------+
| id|first_name|last_name|gender|          city|           job_title|   Salary| latitude| longitude|
+---+----------+---------+------+--------------+--------------------+---------+---------+----------+
| 11|    Kylynn|  Lockart|Female|      El Cardo|Nuclear Power Eng...|$13604.63|    -5.85| -79.88333|
| 12|       Rey|   Meharg|Female|   Wangqingtuo|Systems Administr...|$73423.70| 39.17238| 116.93161|
| 13|      Kerr|   Braden|  Male|     Sułkowice|Compensation Analyst|$33432.99| 49.81518| 19.377174|
| 14|    Mickie|Whanstall|  Male|   Springfield|Assistant Media P...|$50838.53| 42.10148|-72.576675|
| 15|    Kaspar|    Pally|  Male|        Chrást|  Analyst Programmer|$40163.03| 49.79233| 13.491532|
| 16|    Norbie|   Gwyllt|  Male|        Xijiao|              Editor|$32492.73|43.494576|  5.897802|
| 17|    Claude|   Briant|Female|     Mieścisko|Research Assistan...|$51862.48|52.744167| 1

### SQL on Dataframes

In [70]:
df.registerTempTable('original')
query1 = spark.sql("""
  SELECT *
  FROM original
""")
query1.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|              

In [71]:
query2 = spark.sql("""
  SELECT 
    id
    , CONCAT(first_name," ",last_name) as full_name
    --, city -- commented line
  FROM original
  WHERE gender = 'Female'
""")
query2.show()

+---+-------------------+
| id|          full_name|
+---+-------------------+
|  1|  Melinde Shilburne|
|  2|Kimberly Von Welden|
|  3|    Alvera Di Boldi|
|  6|         Maris Folk|
|  7|       Masha Divers|
| 11|     Kylynn Lockart|
| 12|         Rey Meharg|
| 17|      Claude Briant|
| 19|  Tiffanie Pattison|
| 23|    Lurleen Janczak|
| 24|      Nichol Holtum|
| 25|       Shaun Bridle|
| 26|     Leandra Anfrey|
| 28|    Jaquelyn Hazard|
| 29|  Prudence Honacker|
| 30|       Cherey Liger|
| 31|          Neda Krop|
| 34|    Barbi Fattorini|
| 38|   Lonnie Townshend|
| 39|    Valida Salzberg|
+---+-------------------+
only showing top 20 rows



### Adding Calculated Columns

In [72]:
from pyspark.sql.functions import *
df_clean_salary = df.withColumn('clean_salary', df.Salary.substr(2,100).cast('float'))
df_clean_salary.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil En

In [73]:
df_monthly_salary = df_clean_salary.withColumn('monthly_salary', df_clean_salary.clean_salary/12)
df_monthly_salary.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|clean_salary|    monthly_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+------------+------------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|    57438.18| 4786.514973958333|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|     62846.6|    5237.216796875|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|    57576.52| 4798.043294270833|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|    61489.23|   5124.1025390625|
|  5|  Sherwood|   Macieja|  Male|      Mytishch

In [74]:
# df_female = df.withColumn('Female?', when(df.gender == 'Female', 'yes'). otherwise('no'))
df_female = df.withColumn('Female?', when(df.gender == 'Female', 1). otherwise(0))
df_female.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+-------+
| id|first_name| last_name|gender|           city|           job_title|   Salary|  latitude| longitude|Female?|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+-------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|      1|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|      1|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52| 39.994747|116.339775|      1|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|      0|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.648994|      0|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998| 

### Group_By and Aggregation

In [75]:
import pyspark.sql.functions as sqlfunc
df1 = df_clean_salary.groupBy('gender').agg(sqlfunc.sum('clean_salary'))
df1.show()

+------+--------------------+
|gender|   sum(clean_salary)|
+------+--------------------+
|Female|2.7364519950195312E7|
|  Male|2.8123435678710938E7|
+------+--------------------+



In [76]:
df1 = df_clean_salary.groupBy('gender').agg(sqlfunc.sum('clean_salary').alias('Total'),
                                            sqlfunc.avg('clean_salary').alias('Average'),
                                            sqlfunc.min('clean_salary').alias('Min'),
                                            sqlfunc.max('clean_salary').alias('Max')
)
df1.show()

+------+--------------------+-----------------+--------+--------+
|gender|               Total|          Average|     Min|     Max|
+------+--------------------+-----------------+--------+--------+
|Female|2.7364519950195312E7|55618.94298820185|10616.44|99948.28|
|  Male|2.8123435678710938E7|55361.09385573019|10101.92|99942.92|
+------+--------------------+-----------------+--------+--------+



In [77]:
df1 = df_clean_salary.groupBy('gender', 'job_title').agg(sqlfunc.sum('clean_salary').alias('Total'),
                                            sqlfunc.avg('clean_salary').alias('Average'),
                                            sqlfunc.min('clean_salary').alias('Min'),
                                            sqlfunc.max('clean_salary').alias('Max')
)
df1.show()

+------+--------------------+-----------------+------------------+--------+--------+
|gender|           job_title|            Total|           Average|     Min|     Max|
+------+--------------------+-----------------+------------------+--------+--------+
|Female|    Statistician III|    44224.8984375|     44224.8984375| 44224.9| 44224.9|
|  Male|     Cost Accountant| 322273.427734375|40284.178466796875|15849.42|81698.25|
|Female|         Engineer IV|   134404.5703125|    67202.28515625| 57365.1|77039.47|
|Female| Clinical Specialist|189083.8876953125|47270.971923828125| 12468.7|81003.76|
|Female|    Dental Hygienist| 155758.638671875|25959.773111979168|10808.16| 44627.3|
|Female|Research Assistan...|   70371.83984375|   35185.919921875|21039.36|49332.48|
|Female|  Nurse Practicioner|292473.1064453125|  58494.6212890625|12908.38|91322.55|
|  Male| Geological Engineer|   296738.3828125|     59347.6765625|23825.54|94839.56|
|  Male|            VP Sales|  405120.08984375| 67520.01497395833

### Writing dataframe to file and to Pandas dataframe

In [78]:
# df1.write.json('df1.json')
# df1.write.parquet('df1.parquet')
# df1.write.csv('df1.csv')

In [85]:
import pandas as pd

from pyspark.sql.functions import *
df_clean_salary2 = df_dropped.withColumn('clean_salary', df.Salary.substr(2,100).cast('float'))

pandasDF = df_clean_salary2.toPandas()
pandasDF

Unnamed: 0,id,first_name,last_name,gender,city,job_title,Salary,latitude,longitude,clean_salary
0,1,Melinde,Shilburne,Female,Nowa Ruda,Assistant Professor,$57438.18,50.577408,16.496717,57438.179688
1,2,Kimberly,Von Welden,Female,Bulgan,Programmer II,$62846.60,48.823158,103.521820,62846.601562
2,4,Shannon,O'Griffin,Male,Divnomorskoye,Budget/Accounting Analyst II,$61489.23,44.504723,38.130016,61489.230469
3,6,Maris,Folk,Female,Kinsealy-Drinan,Civil Engineer,$30101.16,53.426613,-6.164500,30101.160156
4,8,Goddart,Flear,Male,Trélissac,Desktop Support Technician,$46116.36,45.190517,0.742312,46116.359375
...,...,...,...,...,...,...,...,...,...,...
992,996,Kathye,Grasser,Female,Dzaoudzi,Accountant IV,$65520.45,-12.782212,45.258209,65520.449219
993,997,Haskell,Kempston,Male,Ban Talat Nua,Biostatistician I,$37021.92,7.912274,98.345970,37021.921875
994,998,Holly-anne,Gerbl,Female,Guanaja,Speech Pathologist,$16200.10,16.482662,-85.879326,16200.099609
995,999,Marysa,Purdie,Female,Sioah,Desktop Support Technician,$95912.44,5.586496,95.351402,95912.437500
