# Config

In [1]:
# spark, pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #renew java
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate() #create a new session
spark

## Part1: Read a file, delete null values, create new cols, dtype changes, mean and median, when(), otherwise()

In [5]:
# read a file
# it takes the first row as its header
df = spark.read.format("csv").option("header","True").load("/content/drive/MyDrive/datasets/original.csv")

# another way: df = df.spark.read.csv("path",header=True)

In [6]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [7]:
# add a new city column without any NULLs

from pyspark.sql.functions import *
df2= df.withColumn("clean_city", when(df.City.isNull(),"unknown").otherwise(df.City))

In [8]:
df2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

In [9]:
# delete null job title rows with a filter

df = df.filter(df.JobTitle.isNotNull())
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36|45.1905186|  0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Pr

In [10]:
# if salary is null, replace with mean salary

df = df.withColumn('clean_salary',df.Salary.substr(2,100).cast('float')) # change the dtype

mean = df.groupBy().avg('clean_salary').take(1)[0][0]

In [11]:
from pyspark.sql.functions import lit #lit = literal value

# rewriting with the mean rule
df = df.withColumn('new salary', when(df.clean_salary.isNull(), lit(mean)).otherwise(df.clean_salary))

In [12]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|clean_salary|      new salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|    63863.09|  63863.08984375|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|     

In [13]:
# get medians of some columns

import numpy as np

latitudes = df.select("Latitude")
#latitudes.show()

In [14]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull()) # delete null so the values are all strings

latitudes = latitudes.withColumn('lat2',latitudes.Latitude.cast("float")).select("lat2") #dtype change and only select the new col


In [15]:
median = np.median(latitudes.collect())
print(median)

31.93397331237793


In [16]:
df = df.withColumn("lat", when(df.Latitude.isNull(),lit(median)).otherwise(df.Latitude))

In [17]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|clean_salary|      new salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|    61489.23|  61489.23046875|       44.5047212|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09| 

## Part 2: more analyses

**Three questions:**

*   Do men get more salary than women on average?
*   By job title, do they differ on avg salary?
*    which city has the highest salary?





In [18]:
# the first question
import pyspark.sql.functions as sqlfunc

genders = df.groupBy('gender').agg(sqlfunc.avg('clean_salary')).alias("salary")

In [19]:
genders.show()

+------+------------------+
|gender| avg(clean_salary)|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [20]:
# if sub is male, add male value and 0 on female value, otherwise female value and male value to 0

df = df.withColumn("female salary", when(df.gender == 'Female', df.clean_salary).otherwise(lit(0)))
df = df.withColumn("male salary", when(df.gender == 'Male', df.clean_salary).otherwise(lit(0)))

In [21]:
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+-----------------+-------------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|clean_salary|      new salary|              lat|female salary|male salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+------------+----------------+-----------------+-------------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|    57438.18|   57438.1796875|       50.5774075|     57438.18|        0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|     62846.6|   62846.6015625|       48.8231572|      62846.6|        0.0|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171| 

In [22]:
df.groupBy("JobTitle").agg(sqlfunc.avg("female salary").alias("final_female_salary"))

DataFrame[JobTitle: string, final_female_salary: double]

In [23]:
df.groupBy("JobTitle").agg(sqlfunc.avg("male salary").alias("final_male_salary"))

DataFrame[JobTitle: string, final_male_salary: double]

In [27]:
# the third question

cityavg = df.groupBy("City").agg(sqlfunc.avg("clean_salary").alias("avgsalary"))

In [32]:
# sort the result
cityavg = cityavg.sort(col("avgsalary").desc())
cityavg.show()

+-----------------+-------------+
|             City|    avgsalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



## More about df
loading, insepction

In [34]:
df.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('clean_salary', 'float'),
 ('new salary', 'double'),
 ('lat', 'string'),
 ('female salary', 'float'),
 ('male salary', 'float')]

In [36]:
# change data types; no need to cast then
from pyspark.sql.types import *

schema = StructType([
    StructField("id",IntegerType()),
    StructField("first_name",StringType()),
    StructField("last_name",StringType()),
    StructField("gender",StringType()),
    StructField("city",StringType()),
    StructField("job_title",StringType()),
    StructField("salary",StringType()),
    StructField("latitude",FloatType()),
    StructField("longitude",FloatType())
                     ])
df2 = spark.read.csv("/content/drive/MyDrive/datasets/original.csv",header=True,schema=schema)

In [37]:
df2.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('city', 'string'),
 ('job_title', 'string'),
 ('salary', 'string'),
 ('latitude', 'float'),
 ('longitude', 'float')]

In [39]:
df2.head(6)

[Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', city='Nowa Ruda', job_title='Assistant Professor', salary='$57438.18', latitude=50.57740783691406, longitude=16.49671745300293),
 Row(id=2, first_name='Kimberly', last_name='Von Welden', gender='Female', city='Bulgan', job_title='Programmer II', salary='$62846.60', latitude=48.823158264160156, longitude=103.52182006835938),
 Row(id=3, first_name='Alvera', last_name='Di Boldi', gender='Female', city=None, job_title=None, salary='$57576.52', latitude=39.994747161865234, longitude=116.33977508544922),
 Row(id=4, first_name='Shannon', last_name="O'Griffin", gender='Male', city='Divnomorskoye', job_title='Budget/Accounting Analyst II', salary='$61489.23', latitude=44.504722595214844, longitude=38.1300163269043),
 Row(id=5, first_name='Sherwood', last_name='Macieja', gender='Male', city='Mytishchi', job_title='VP Sales', salary='$63863.09', latitude=None, longitude=37.64899444580078),
 Row(id=6, first_name='Maris', last

In [42]:
df2.first()

Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', city='Nowa Ruda', job_title='Assistant Professor', salary='$57438.18', latitude=50.57740783691406, longitude=16.49671745300293)

In [43]:
df2.describe().show() #give a basic descriptive analysis

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|summary|               id|first_name|last_name|gender|               city|          job_title|   salary|          latitude|        longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|               999|             1000|
|   mean|            500.5|      null|     null|  null|               null|               null|     null| 25.43151724702484|43.33756460386515|
| stddev|288.8194360957494|      null|     null|  null|               null|               null|     null|24.579082550156635| 69.4206453674681|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator|$10101.92|         -54.28115|       -123.04196|

In [44]:
df2.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'city',
 'job_title',
 'salary',
 'latitude',
 'longitude']

In [45]:
df2.count()

1000

In [46]:
df2.distinct().count() #non-repetitive counts

1000

## none and repetitions

In [47]:
# drop all nulls
df2_dropped = df2.na.drop()

In [49]:
# drop based on cols

df_null_job = df2.filter(df2.job_title.isNotNull())

In [52]:
# replace nulls with other values
df2= df2.withColumn("clean_city", when(df2.city.isNull(),"unknown").otherwise(df2.city))

In [53]:
# drop duplicates
df_no_dup = df.dropDuplicates()

## Selecting and filtering data

In [54]:
# col select

df_select1 = df2.select("first_name","last_name")
df_select1.head(5)

[Row(first_name='Melinde', last_name='Shilburne'),
 Row(first_name='Kimberly', last_name='Von Welden'),
 Row(first_name='Alvera', last_name='Di Boldi'),
 Row(first_name='Shannon', last_name="O'Griffin"),
 Row(first_name='Sherwood', last_name='Macieja')]

In [56]:
# col rename

df_rename = df.withColumnRenamed("first_name","fn")
df_rename.columns

['id',
 'fn',
 'last_name',
 'gender',
 'City',
 'JobTitle',
 'Salary',
 'Latitude',
 'Longitude',
 'clean_salary',
 'new salary',
 'lat',
 'female salary',
 'male salary']

In [58]:
# filter

df_filter1 = df2.filter((df2.first_name=="Alvera"))
df_filter1.show()

+---+----------+---------+------+----+---------+---------+---------+----------+----------+
| id|first_name|last_name|gender|city|job_title|   salary| latitude| longitude|clean_city|
+---+----------+---------+------+----+---------+---------+---------+----------+----------+
|  3|    Alvera| Di Boldi|Female|null|     null|$57576.52|39.994747|116.339775|   unknown|
+---+----------+---------+------+----+---------+---------+---------+----------+----------+



In [61]:
# filter with a value like ...

df_filter2 = df2.filter((df2.first_name.like("%ver%")))
df_filter2.show()

+---+----------+-----------+------+------------+--------------------+---------+---------+----------+------------+
| id|first_name|  last_name|gender|        city|           job_title|   salary| latitude| longitude|  clean_city|
+---+----------+-----------+------+------------+--------------------+---------+---------+----------+------------+
|  3|    Alvera|   Di Boldi|Female|        null|                null|$57576.52|39.994747|116.339775|     unknown|
|179|   Laverna|  Yuryichev|Female|  Lyaskovets| Marketing Assistant|$95106.34|43.068382| 25.742846|  Lyaskovets|
|775|   Alverta|   MacNulty|Female|  Megalópoli| Geological Engineer|$17299.62|37.401245| 22.136488|  Megalópoli|
|868|    Denver|      Olman|  Male| Ryazanskaya| Structural Engineer|$56653.70|44.956562| 39.585327| Ryazanskaya|
|909|     Avery|Tunniclisse|  Male|Karlovy Vary|          Pharmacist|$46071.13|50.231853| 12.871962|Karlovy Vary|
|923|    Traver|  Stillwell|  Male|      Perico|Occupational Ther...|$79520.50|22.768658

In [60]:
# filter with a value ending with
df_filter3 = df2.filter((df2.first_name.endswith("er")))
df_filter3.show()

+---+----------+----------+------+--------------+--------------------+---------+----------+----------+--------------+
| id|first_name| last_name|gender|          city|           job_title|   salary|  latitude| longitude|    clean_city|
+---+----------+----------+------+--------------+--------------------+---------+----------+----------+--------------+
|399| Silvester|    Hasely|  Male|        Tiaong|Research Assistan...|$75543.84| 13.943482| 121.36913|        Tiaong|
|448|    Lester|Langthorne|  Male|      HanHuang|    Junior Executive|$34173.94| 40.142128| 94.661964|      HanHuang|
|452|    Rutger|    Kirman|  Male|       Zhosaly|Database Administ...|$74509.57| 45.485245|  64.08186|       Zhosaly|
|456|    Garner|     Stern|  Male|    Pangawaren|Desktop Support T...|$96462.59|-7.3897543| 108.86406|    Pangawaren|
|551|   Hillyer|     Stych|  Male|Pont-à-Mousson|Systems Administr...|$88624.97| 48.930866|  6.037629|Pont-à-Mousson|
|694|   Olivier|  Stanlick|  Male|      Dongshan|     De

In [62]:
# filter between two numbers
# filter with a value ending with
df_filter4 = df2.filter((df2.id.between(1,3)))
df_filter4.show()

+---+----------+----------+------+---------+-------------------+---------+---------+----------+----------+
| id|first_name| last_name|gender|     city|          job_title|   salary| latitude| longitude|clean_city|
+---+----------+----------+------+---------+-------------------+---------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408| 16.496717| Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60| 48.82316| 103.52182|    Bulgan|
|  3|    Alvera|  Di Boldi|Female|     null|               null|$57576.52|39.994747|116.339775|   unknown|
+---+----------+----------+------+---------+-------------------+---------+---------+----------+----------+



In [63]:
# filer based on a list
df_filter5 = df2.filter((df2.id.isin(1,3)))
df_filter5.show()

+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+
| id|first_name|last_name|gender|     city|          job_title|   salary| latitude| longitude|clean_city|
+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+
|  1|   Melinde|Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408| 16.496717| Nowa Ruda|
|  3|    Alvera| Di Boldi|Female|     null|               null|$57576.52|39.994747|116.339775|   unknown|
+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+



In [65]:
# filter with truncation/substring
df_substr = df2.select(df2.first_name,df2.first_name.substr(1,3).alias("substring"))
df_substr.head(5)

[Row(first_name='Melinde', substring='Mel'),
 Row(first_name='Kimberly', substring='Kim'),
 Row(first_name='Alvera', substring='Alv'),
 Row(first_name='Shannon', substring='Sha'),
 Row(first_name='Sherwood', substring='She')]

In [67]:
# use multiple filters

#filter((standard1) & (standard2))

df_filter5 = df2.filter((df2.id.isin(1,3)) & (df2.first_name == "Alvera") )
df_filter5.show()

+---+----------+---------+------+----+---------+---------+---------+----------+----------+
| id|first_name|last_name|gender|city|job_title|   salary| latitude| longitude|clean_city|
+---+----------+---------+------+----+---------+---------+---------+----------+----------+
|  3|    Alvera| Di Boldi|Female|null|     null|$57576.52|39.994747|116.339775|   unknown|
+---+----------+---------+------+----+---------+---------+---------+----------+----------+



In [69]:
# or: |
df_filter5 = df2.filter((df2.id.isin(1,3)) | (df2.first_name == "Alvera") )
df_filter5.show()

+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+
| id|first_name|last_name|gender|     city|          job_title|   salary| latitude| longitude|clean_city|
+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+
|  1|   Melinde|Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408| 16.496717| Nowa Ruda|
|  3|    Alvera| Di Boldi|Female|     null|               null|$57576.52|39.994747|116.339775|   unknown|
+---+----------+---------+------+---------+-------------------+---------+---------+----------+----------+



## run SQL on dataframes
need to register a table for sql requests

In [72]:
df = spark.read.csv("/content/drive/MyDrive/datasets/original.csv",header=True)

In [73]:
df.registerTempTable("original")

In [74]:
query1 = spark.sql("select * from original")
query1.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [76]:
query2 = spark.sql('select concat(first_name, " ", last_name) as full_name from original where gender = "Female"')
query2.show()

+-------------------+
|          full_name|
+-------------------+
|  Melinde Shilburne|
|Kimberly Von Welden|
|    Alvera Di Boldi|
|         Maris Folk|
|       Masha Divers|
|     Kylynn Lockart|
|         Rey Meharg|
|      Claude Briant|
|  Tiffanie Pattison|
|    Lurleen Janczak|
|      Nichol Holtum|
|       Shaun Bridle|
|     Leandra Anfrey|
|    Jaquelyn Hazard|
|  Prudence Honacker|
|       Cherey Liger|
|          Neda Krop|
|    Barbi Fattorini|
|   Lonnie Townshend|
|    Valida Salzberg|
+-------------------+
only showing top 20 rows



## Add calculated columns

In [78]:
# slary col has $ so we cast format changes before

df = df.withColumn('clean_salary',df.Salary.substr(2,100).cast('float')) # change the dtype
df.head(5)

[Row(id='1', first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude='50.5774075', Longitude='16.4967184', clean_salary=57438.1796875),
 Row(id='2', first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude='48.8231572', Longitude='103.5218199', clean_salary=62846.6015625),
 Row(id='3', first_name='Alvera', last_name='Di Boldi', gender='Female', City=None, JobTitle=None, Salary='$57576.52', Latitude='39.9947462', Longitude='116.3397725', clean_salary=57576.51953125),
 Row(id='4', first_name='Shannon', last_name="O'Griffin", gender='Male', City='Divnomorskoye', JobTitle='Budget/Accounting Analyst II', Salary='$61489.23', Latitude='44.5047212', Longitude='38.1300171', clean_salary=61489.23046875),
 Row(id='5', first_name='Sherwood', last_name='Macieja', gender='Male', City='Mytishchi', JobTitle='VP Sales', Salary='$63863.09', Lat

In [79]:
df = df.withColumn("monthly_salary",df.clean_salary/12)
df.head(3)

[Row(id='1', first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude='50.5774075', Longitude='16.4967184', clean_salary=57438.1796875, monthly_salary=4786.514973958333),
 Row(id='2', first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude='48.8231572', Longitude='103.5218199', clean_salary=62846.6015625, monthly_salary=5237.216796875),
 Row(id='3', first_name='Alvera', last_name='Di Boldi', gender='Female', City=None, JobTitle=None, Salary='$57576.52', Latitude='39.9947462', Longitude='116.3397725', clean_salary=57576.51953125, monthly_salary=4798.043294270833)]

In [80]:
# boolean col
df = df.withColumn("are they female", when(df.gender == "Female","yes").otherwise("no"))
df.head(2)

[Row(id='1', first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude='50.5774075', Longitude='16.4967184', clean_salary=57438.1796875, monthly_salary=4786.514973958333, are they female='yes'),
 Row(id='2', first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude='48.8231572', Longitude='103.5218199', clean_salary=62846.6015625, monthly_salary=5237.216796875, are they female='yes')]

In [84]:
# df = df.drop("monthly_salary","are they female")

## Group by aggregations

In [86]:
df1 = df.groupBy('gender').agg(sqlfunc.mean('clean_salary').alias('total'),
                               sqlfunc.sum('clean_salary').alias('average'),
                               sqlfunc.max('clean_salary').alias('max'),
                               sqlfunc.min('clean_salary').alias('min'))
df1.show()

+------+-----------------+--------------------+--------+--------+
|gender|            total|             average|     max|     min|
+------+-----------------+--------------------+--------+--------+
|Female|55618.94298820185|2.7364519950195312E7|99948.28|10616.44|
|  Male|55361.09385573019|2.8123435678710938E7|99942.92|10101.92|
+------+-----------------+--------------------+--------+--------+



In [87]:
# add more grouping filter

df1 = df.groupBy('gender','city').agg(sqlfunc.mean('clean_salary').alias('total'),
                               sqlfunc.sum('clean_salary').alias('average'),
                               sqlfunc.max('clean_salary').alias('max'),
                               sqlfunc.min('clean_salary').alias('min'))
df1.show()

+------+-----------------+----------------+----------------+--------+--------+
|gender|             city|           total|         average|     max|     min|
+------+-----------------+----------------+----------------+--------+--------+
|Female|           Dachun| 25090.869140625| 25090.869140625|25090.87|25090.87|
|Female|      Trollhättan|53311.6845703125|106623.369140625| 79792.9|26830.47|
|  Male|          Wenshao| 18941.509765625| 18941.509765625|18941.51|18941.51|
|Female|            Lanas| 13765.900390625| 13765.900390625| 13765.9| 13765.9|
|  Male|            Mörön|    77940.078125|    77940.078125|77940.08|77940.08|
|Female|             Same|   73369.7265625|   73369.7265625|73369.73|73369.73|
|Female|          Sawahan|  24608.83984375|  24608.83984375|24608.84|24608.84|
|  Male|Monte da Boavista|     98586.71875|     98586.71875|98586.72|98586.72|
|Female|         Nusajaya|    71637.921875|    71637.921875|71637.92|71637.92|
|Female|            Kista|   96192.3984375|   96192.

## Write df into files

In [None]:
# df1.write.csv('df1.csv')
# df1.write.json('df1.json')
# df1.write.parquet('df1.parquet')

Project practice

In [88]:
df = spark.read.csv("/content/drive/MyDrive/datasets/challenge.csv",header=True)

In [89]:
# Q1: add a column to say whether the country is mexico

df = df.withColumn("mexico?",when(df.Country=="Mexico","yes").otherwise("no"))
df.head(5)

[Row(ip_address='52.81.192.172', Country='China', Domain Name='odnoklassniki.ru', Bytes_used='463', mexico?='no'),
 Row(ip_address='119.239.207.13', Country='China', Domain Name='youtu.be', Bytes_used='51', mexico?='no'),
 Row(ip_address='68.69.217.210', Country='China', Domain Name='adobe.com', Bytes_used='10', mexico?='no'),
 Row(ip_address='7.191.21.223', Country='Bulgaria', Domain Name='linkedin.com', Bytes_used='853', mexico?='no'),
 Row(ip_address='211.13.10.68', Country='Indonesia', Domain Name='hud.gov', Bytes_used='29', mexico?='no')]

In [90]:
# Q2: group by your new column and sum of bytes used
df2 = df.groupBy("mexico?").agg(sqlfunc.sum("Bytes_used").alias('sum of bytes'))
df2.show()

+-------+------------+
|mexico?|sum of bytes|
+-------+------------+
|     no|    508076.0|
|    yes|      6293.0|
+-------+------------+



In [97]:
# groupby country and use sqlfunc.countDistinct to count number of ips in each country

df3 = df.groupBy("Country").agg(sqlfunc.countDistinct("ip_address").alias("ip_count"))
df3 = df3.sort(col("ip_count").desc())
df3.show()

+--------------+--------+
|       Country|ip_count|
+--------------+--------+
|         China|     172|
|     Indonesia|     114|
|   Philippines|      65|
|        Russia|      56|
|        Brazil|      35|
|        Poland|      31|
|        Sweden|      28|
|         Japan|      25|
|      Portugal|      23|
|Czech Republic|      23|
|        France|      21|
|          Peru|      19|
|      Colombia|      17|
| United States|      15|
|       Ukraine|      14|
|     Argentina|      14|
|        Mexico|      13|
|      Thailand|      12|
|       Nigeria|      11|
|        Canada|      11|
+--------------+--------+
only showing top 20 rows

