# Exercises

In [25]:
import pyspark
import numpy as np
import pandas as pd
import pydataset

spark = pyspark.sql.SparkSession.builder.getOrCreate()
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import lit
from pyspark.sql.functions import *

import warnings
warnings.filterwarnings('ignore')

### 1.Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
#create a dictionary
pl = {
    'language': ['python', 'sql', 'c++', 'java', 'pascal']
    
}

In [3]:
#create a pandas df
pandas_dataframe = pd.DataFrame(pl)
pandas_dataframe

Unnamed: 0,language
0,python
1,sql
2,c++
3,java
4,pascal


In [4]:
#other way
pl = pd.DataFrame(
    {"language": ["r", "python", "sql", "julia", "pig latin", "lorem ipsum"]}
)
df = spark.createDataFrame(pl)

In [5]:
#create aspark df
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[language: string]

In [6]:
#schema
df.printSchema()

root
 |-- language: string (nullable = true)



In [7]:
#shape
print((df.count(), len(df.columns)))

[Stage 0:>                                                          (0 + 8) / 8]

(5, 1)


                                                                                

In [8]:
# check the first 5 rows
df.show(5)

+--------+
|language|
+--------+
|  python|
|     sql|
|     c++|
|    java|
|  pascal|
+--------+



###  2. Load the mpg dataset as a spark dataframe.

In [9]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



    a. Create 1 column of output that contains a message like the one below:


    ```The 1999 audi a4 has a 4 cylinder engine.```

    For each vehicle.

In [10]:
mpg.select('*', concat(lit("The "),mpg.year, mpg.manufacturer, lit(" "), mpg.model,
       lit(" has a  "), mpg.cyl, lit(" cylinder engine" )).alias('description')).show(truncate=False)

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-------------------------------------------------------------+
|manufacturer|model             |displ|year|cyl|trans     |drv|cty|hwy|fl |class  |description                                                  |
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+-------------------------------------------------------------+
|audi        |a4                |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999audi a4 has a  4 cylinder engine                     |
|audi        |a4                |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999audi a4 has a  4 cylinder engine                     |
|audi        |a4                |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008audi a4 has a  4 cylinder engine                     |
|audi        |a4                |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008audi a4 has a  4 cylinder engine 

In [11]:
#here I'm just geting one column in the df
new_df = mpg.select('*', concat(lit("The "),mpg.year, mpg.manufacturer, lit(" "), mpg.model,
       lit(" has a  "), mpg.cyl, lit(" cylinder engine" )).alias('description'))

In [12]:
#this is the new column
new_col = concat(lit("The "),mpg.year, mpg.manufacturer, lit(" "), mpg.model,
        lit(" has a  "), mpg.cyl, lit(" cylinder engine" ))

In [13]:
#new df with the existing 
new_mpg =mpg.select('*', new_col.alias('description'))

In [14]:
new_mpg.show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+--------------------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|         description|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+--------------------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|The 1999audi a4 h...|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|The 1999audi a4 h...|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|The 2008audi a4 h...|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|The 2008audi a4 h...|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|The 1999audi a4 h...|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|The 1999audi a4 h...|
|        audi|                a4|  3.1|2008|  6|  auto(

    b. Transform the trans column so that it only contains either manual or auto.

In [15]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import when

In [16]:
mpg.select(mpg.trans, when((mpg.trans == 'manual(m5)') | (mpg.trans == 'manual(m6)'), 'manual').otherwise("auto")).show()

+----------+----------------------------------------------------------------------------------+
|     trans|CASE WHEN ((trans = manual(m5)) OR (trans = manual(m6))) THEN manual ELSE auto END|
+----------+----------------------------------------------------------------------------------+
|  auto(l5)|                                                                              auto|
|manual(m5)|                                                                            manual|
|manual(m6)|                                                                            manual|
|  auto(av)|                                                                              auto|
|  auto(l5)|                                                                              auto|
|manual(m5)|                                                                            manual|
|  auto(av)|                                                                              auto|
|manual(m5)|                            

In [17]:
# multiple ways to do it 
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        mpg.trans.like("auto%"), "auto"
    ).otherwise("manual").alias("when + like")
).show()

+----------+--------------+--------------+-----------+
|     trans|regexp_extract|regexp_replace|when + like|
+----------+--------------+--------------+-----------+
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|manual(m6)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(l5)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(l5

3. Load the tips dataset as a spark dataframe.

In [18]:
tips = pydataset.data('tips')

In [19]:
tips = spark.createDataFrame(tips)
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

    a. What percentage of observations are smokers?

In [20]:
# number of smokers
smokers = tips.where(tips.smoker == 'Yes').count()

In [21]:
smokers

93

In [22]:
#number of customers
total = tips.count()

In [23]:
total

244

In [26]:
round(smokers / total * 100 , 2)

TypeError: Invalid argument, not a string or column: 38.114754098360656 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [None]:
print(f' The percentage of customers that are smokers is : {round(smokers / total * 100 , 2)} %')

In [None]:
#other way to do it

In [None]:

tips.groupBy("smoker").count().show()

In [None]:
tips.groupBy("smoker").count().withColumn(
    "percent",
    concat(round((col("count") / tips.count() * 100), 0).cast("int"), lit("%")),
).show()

    b. Create a column that contains the tip percentage

In [None]:
col = (tips.tip / tips.total_bill)
col

In [None]:
tips.select(col).show(5)

In [None]:
tips_pct = tips.select('*', col.alias('tip_pct'))

In [None]:
tips_pct.show(5)

In [None]:
#other way
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).show()

    c. Calculate the average tip percentage for each combination of sex and smoker.

In [None]:
tips_pct.groupBy('sex', 'smoker').agg(mean('tip_pct')).show()

In [None]:
#other way to do it
(
    tips.withColumn("tip_percentage", col('tip') / col('total_bill'))
    .groupby("sex")
    .pivot("smoker")
    .agg(round(mean("tip_percentage"), 4))
    .show()
)

### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

- Convert the temperatures to fahrenheit. formula (0°C × 9/5) + 32

In [None]:
temp_maxC= (weather.temp_max * 9/5) + 32

In [None]:
tem_minC = (weather.temp_min * 9/5) + 32

In [None]:
weather = weather.withColumn(
    "temp_max", (col("temp_max") *9 /5 +32)
)

In [None]:
weather = weather.withColumn(
    "temp_min", (col("temp_min") *9 /5 +32)
)

In [None]:
weather.show(5)

In [None]:
#other way to do it
# pandas equivalent -- df.temp_max = df.temp_max * 9 / 5 + 32

weather = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))

In [None]:
weather.show(4)

- Which month has the most rain, on average?

In [None]:
(
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("total_rainfall")
    .show()
)

In [None]:
 (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
).show(5)


In [None]:
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

In [None]:
row?

- Which year was the windiest?

In [None]:
(
weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_wind")).show(5)
)

In [None]:
(
weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_wind")).show(5)
)

- What is the most frequent type of weather in January?

- What is the average high and low temperature on sunny days in July in 2013 and 2014?

- What percentage of days were rainy in q3 of 2015?

- For each year, find what percentage of days it rained (had non-zero precipitation).