In [1]:
import pyspark
import pandas as pd
from pydataset import data
from pyspark.sql import functions as f

In [2]:
# create session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/19 09:36:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# creating spark data frame
df = spark.createDataFrame(pd.DataFrame({'language': ['python', 'sql', 'r', 'java', 'javascript', 'c', 'ruby']}))

In [4]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [5]:
df.describe().show()

[Stage 0:>                                                        (0 + 10) / 10]

+-------+--------+
|summary|language|
+-------+--------+
|  count|       7|
|   mean|    null|
| stddev|    null|
|    min|       c|
|    max|     sql|
+-------+--------+



                                                                                

In [6]:
df.show(5)

+----------+
|  language|
+----------+
|    python|
|       sql|
|         r|
|      java|
|javascript|
+----------+
only showing top 5 rows



In [7]:
mpg = data('mpg')

In [8]:
# creating zpark df
df = spark.createDataFrame(mpg)

In [9]:
df.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [10]:
df.select(
    f.concat(f.lit('The '),
             f.expr('year'),
             f.lit(' '),
             f.expr('class'),
             f.lit(' '),
             f.expr('model'),
             f.lit(' has a '),
             f.expr('cyl'),
             f.lit(' cylinder engine.')
            ).alias('sentence')).show()

+--------------------+
|            sentence|
+--------------------+
|The 1999 compact ...|
|The 1999 compact ...|
|The 2008 compact ...|
|The 2008 compact ...|
|The 1999 compact ...|
|The 1999 compact ...|
|The 2008 compact ...|
|The 1999 compact ...|
|The 1999 compact ...|
|The 2008 compact ...|
|The 2008 compact ...|
|The 1999 compact ...|
|The 1999 compact ...|
|The 2008 compact ...|
|The 2008 compact ...|
|The 1999 midsize ...|
|The 2008 midsize ...|
|The 2008 midsize ...|
|The 2008 suv c150...|
|The 2008 suv c150...|
+--------------------+
only showing top 20 rows



In [11]:
df.select(df.trans.startswith('a')).show()

+--------------------+
|startswith(trans, a)|
+--------------------+
|                true|
|               false|
|               false|
|                true|
|                true|
|               false|
|                true|
|               false|
|                true|
|               false|
|                true|
|                true|
|               false|
|                true|
|               false|
|                true|
|                true|
|                true|
|                true|
|                true|
+--------------------+
only showing top 20 rows



In [12]:
df.select(
    f.when(df.trans.startswith('a'), 'auto').otherwise('manual').alias('transmission')
).show()



+------------+
|transmission|
+------------+
|        auto|
|      manual|
|      manual|
|        auto|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|        auto|
|        auto|
|        auto|
|        auto|
+------------+
only showing top 20 rows



In [13]:
# loading the tips data set
tips = data('tips')

In [14]:
# making a sparrk df
df = spark.createDataFrame(tips)

In [19]:
df.select('smoker').describe().show()

+-------+------+
|summary|smoker|
+-------+------+
|  count|   244|
|   mean|  null|
| stddev|  null|
|    min|    No|
|    max|   Yes|
+-------+------+



[Stage 14:>                                                       (0 + 10) / 10]                                                                                

In [30]:
# hwrta percentage of the oberservations are smokers?
# smokers / totalobs * 100
df.select(
    f.concat(f.round((f.avg(f.when(df.smoker == 'Yes', 1).\
    otherwise(0)) * 100), 2), f.lit('%')).alias('smoker_perc')
).show()

+-----------+
|smoker_perc|
+-----------+
|     38.11%|
+-----------+



In [32]:
# Create a column that contains the tip percentage
df.show(1)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 1 row



In [49]:
tip_percentage = f.round(df.tip / df.total_bill * 100, 2).alias('tip_perc')
df.select(
    tip_percentage
).show()

+--------+
|tip_perc|
+--------+
|    5.94|
|   16.05|
|   16.66|
|   13.98|
|   14.68|
|   18.62|
|   22.81|
|   11.61|
|   13.03|
|   21.85|
|   16.65|
|   14.18|
|   10.18|
|   16.28|
|   20.36|
|   18.16|
|   16.17|
|   22.77|
|   20.62|
|   16.22|
+--------+
only showing top 20 rows



In [51]:
# Calculate the average tip percentage for each combination of sex and smoker.
df.groupby('smoker').pivot('sex').agg(f.avg(tip_percentage)).show()

[Stage 87:>                                                       (0 + 10) / 10]

+------+------------------+------------------+
|smoker|            Female|              Male|
+------+------------------+------------------+
|    No| 15.69111111111111|16.066597938144334|
|   Yes|18.214545454545455|15.276666666666667|
+------+------------------+------------------+



                                                                                

In [53]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(weather)
df.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [59]:
# Convert the temperatures to fahrenheit.
farenheit_max = f.round((df.temp_max * 1.8) + 32, 1).alias('temp_max_f')
farenheit_min = f.round((df.temp_min * 1.8) + 32, 1).alias('temp_min_f')

# view change
df.select(
    farenheit_max,
    farenheit_min
).show(5)

+----------+----------+
|temp_max_f|temp_min_f|
+----------+----------+
|      55.0|      41.0|
|      51.1|      37.0|
|      53.1|      45.0|
|      54.0|      42.1|
|      48.0|      37.0|
+----------+----------+
only showing top 5 rows



In [73]:
# Which month has the most rain, on average?
(
    df.withColumn('month', f.month('date'))
    .groupby('month')
    .agg(f.avg('precipitation'))
    .alias('avg_rain')
    .sort(f.desc('`avg_rain`.`avg(precipitation)`' ))
    .show()
)

[Stage 114:=====>                                                  (1 + 9) / 10]

+-----+-------------------+
|month| avg(precipitation)|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548388|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



                                                                                

In [83]:
# Which year was the windiest?
(
    df.withColumn('year', f.year('date'))
    .groupby('year')
    .agg(f.sum('wind'))
    .sort(f.desc('sum(wind)'))
    .show(5)
)

+----+------------------+
|year|         sum(wind)|
+----+------------------+
|2012|            1244.7|
|2014|            1236.5|
|2015|1153.2999999999997|
|2013|1100.8000000000002|
+----+------------------+



In [97]:
# What is the most frequent type of weather in January?
(
    df.withColumn('month', f.month('date'))
    .filter(f.month('date') == 1)
    .groupby(f.month('date'))
    .pivot('weather')
    .count()
    .show()
)

+-----------+-------+---+----+----+---+
|month(date)|drizzle|fog|rain|snow|sun|
+-----------+-------+---+----+----+---+
|          1|     10| 38|  35|   8| 33|
+-----------+-------+---+----+----+---+



In [108]:
# What is the average high and low temperature on sunny days in July in 2013 and 2014?
(
    df
    .filter(df.weather == 'sun')
    .filter(f.month('date') == 7)
    .filter((f.year('date') == 2013) | (f.year('date') == 2014))
    .agg(f.avg('temp_max'), f.avg('temp_min'))
    .show()
)

+------------------+-----------------+
|     avg(temp_max)|    avg(temp_min)|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



In [116]:
# What percentage of days were rainy in q3 of 2015?
(
    df
    .filter(f.year('date') == 2015)
    .filter(f.quarter('date') == 3)
    .select(
        f.when(df.weather == 'rainy', 1)
        .otherwise(0)
        .alias('rain')
    )
    .show()
)

TypeError: 'Column' object is not callable

In [150]:
(
    df.withColumn("quarter", f.quarter("date"))
    .withColumn("year", f.year("date")).select(
    
    ).agg()
    .show()
)

+-------------------------------------------------------------------+
|round((avg(CASE WHEN (weather = rain) THEN 1 ELSE 0 END) * 100), 2)|
+-------------------------------------------------------------------+
|                                                              17.73|
+-------------------------------------------------------------------+



In [161]:


(
    df.withColumn("quarter", f.quarter("date"))
    .withColumn("year", f.year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(f.round(f.avg(f.when(df.weather == 'rain', 1).otherwise(0)) * 100, 2))
    .show()
)


+-------+-----+-----+----+----+
|quarter| 2012| 2013|2014|2015|
+-------+-----+-----+----+----+
|      1|59.34|63.33| 0.0|1.11|
|      3|19.57| 2.17|2.17|2.17|
|      4|70.65| 1.09|1.09|1.09|
|      2|59.34|  0.0| 0.0| 1.1|
+-------+-----+-----+----+----+



In [163]:
# For each year, find what percentage of days it rained (had non-zero precipitation).
(
    df.withColumn("year", f.year("date"))
    .groupby('year')
    .agg(f.round(f.avg(f.when(df.precipitation > 0, 1).otherwise(0)) * 100, 2))
    .show()
)

+----+----------------------------------------------------------------------+
|year|round((avg(CASE WHEN (precipitation > 0) THEN 1 ELSE 0 END) * 100), 2)|
+----+----------------------------------------------------------------------+
|2012|                                                                 48.36|
|2013|                                                                 41.64|
|2014|                                                                  41.1|
|2015|                                                                 39.45|
+----+----------------------------------------------------------------------+

