In [38]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as sf
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql.functions import month, year, quarter
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import asc, desc

In [39]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [90]:
spark

In [96]:
df =spark.createDataFrame(pd.DataFrame({'language': ['Python', 'Java', 'C++', 'Go', 'SQL', 'SAP','javascript']}))

In [97]:
df.show()

+----------+
|  language|
+----------+
|    Python|
|      Java|
|       C++|
|        Go|
|       SQL|
|       SAP|
|javascript|
+----------+



In [98]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [99]:
df.count()

7

In [100]:
df.show(5)

+--------+
|language|
+--------+
|  Python|
|    Java|
|     C++|
|      Go|
|     SQL|
+--------+
only showing top 5 rows



In [45]:
from pydataset import data

In [46]:
mpg = spark.createDataFrame(data('mpg'))

In [47]:
dft = mpg.select(sf.concat(sf.lit("The "),
                  mpg.year,
                  sf.lit(" "),
                  mpg.manufacturer,
                  sf.lit(" has a "),
                  mpg.cyl, 
                  sf.lit(" cylinder engine.")
                 ).alias('result')
          )
dft.show(truncate=False)

+-------------------------------------------+
|result                                     |
+-------------------------------------------+
|The 1999 audi has a 4 cylinder engine.     |
|The 1999 audi has a 4 cylinder engine.     |
|The 2008 audi has a 4 cylinder engine.     |
|The 2008 audi has a 4 cylinder engine.     |
|The 1999 audi has a 6 cylinder engine.     |
|The 1999 audi has a 6 cylinder engine.     |
|The 2008 audi has a 6 cylinder engine.     |
|The 1999 audi has a 4 cylinder engine.     |
|The 1999 audi has a 4 cylinder engine.     |
|The 2008 audi has a 4 cylinder engine.     |
|The 2008 audi has a 4 cylinder engine.     |
|The 1999 audi has a 6 cylinder engine.     |
|The 1999 audi has a 6 cylinder engine.     |
|The 2008 audi has a 6 cylinder engine.     |
|The 2008 audi has a 6 cylinder engine.     |
|The 1999 audi has a 6 cylinder engine.     |
|The 2008 audi has a 6 cylinder engine.     |
|The 2008 audi has a 8 cylinder engine.     |
|The 2008 chevrolet has a 8 cylind

In [48]:
mpg.select(sf.regexp_replace("trans", r"\(.+\)", "").alias("trans")).show(5)

+------+
| trans|
+------+
|  auto|
|manual|
|manual|
|  auto|
|  auto|
+------+
only showing top 5 rows



In [49]:
tips = spark.createDataFrame(data('tips'))

In [50]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [51]:
s = tips.groupBy('smoker').agg(sf.count
                                (tips.smoker)/244)
s.show()

+------+---------------------+
|smoker|(count(smoker) / 244)|
+------+---------------------+
|    No|   0.6188524590163934|
|   Yes|  0.38114754098360654|
+------+---------------------+



In [52]:
tipspc = tips.withColumn('tip_pct', sf.expr('tip / total_bill * 100'))
tipspc.show()

+----------+----+------+------+---+------+----+------------------+
|total_bill| tip|   sex|smoker|day|  time|size|           tip_pct|
+----------+----+------+------+---+------+----+------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|5.9446733372572105|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|16.054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|16.658733936220845|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 13.97804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|14.680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 18.62396204033215|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| 22.80501710376283|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|11.607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|13.031914893617023|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|21.853856562922868|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 16.65043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|14.180374361883

In [87]:
tipspc.groupBy("sex", "smoker").agg(sf.round(
        sf.mean("tip_pct"), 4)
         .alias("tips_pct")
    ).show()

+------+------+--------+
|   sex|smoker|tips_pct|
+------+------+--------+
|  Male|    No| 16.0669|
|Female|    No| 15.6921|
|  Male|   Yes| 15.2771|
|Female|   Yes|  18.215|
+------+------+--------+



In [54]:
from vega_datasets import data
weather = spark.createDataFrame(data.seattle_weather())
weather.show()

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06 00:00:00|          2.5|     4.4|     2.2| 2.2|   rain|
|2012-01-07 00:00:00|          0.0|     7.2|     2.8| 2.3|   rain|
|2012-01-08 00:00:00|          0.0|    10.0|     2.8| 2.0|    sun|
|2012-01-09 00:00:00|          4.3|     9.4|     5.0| 3.4|   rain|
|2012-01-10 00:00:00|          1.0|     6.1|     0.6| 3.4|   rain|
|2012-01-11 00:00:00|          0.0|     6.1|    -1.1| 5.1|    sun|
|2012-01-12 00:00:00|          0.0|     6.1|    -1.7| 1.9|    

In [55]:
weather.select((weather.temp_max * 9/5 + 32).alias('temp_max_f'), 
               (weather.temp_min * 9/5 + 32).alias('temp_min_f')).show(5)

+----------+----------+
|temp_max_f|temp_min_f|
+----------+----------+
|     55.04|      41.0|
|     51.08|     37.04|
|     53.06|     44.96|
|     53.96|     42.08|
|     48.02|     37.04|
+----------+----------+
only showing top 5 rows



In [58]:
weather=weather.withColumn("month", month("date"))
weather.groupBy("month").agg(mean("precipitation").alias("rainfall")).sort(desc("rainfall")).show()

+-----+-------------------+
|month|           rainfall|
+-----+-------------------+
|   11|  5.354166666666667|
|   12|  5.021774193548388|
|    3|  4.888709677419355|
|   10|  4.059677419354839|
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    4|  3.128333333333333|
|    9| 1.9624999999999997|
|    5| 1.6733870967741935|
|    8| 1.3201612903225806|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
+-----+-------------------+



In [61]:
weather=weather.withColumn("year", year("date"))
weather.groupBy("year").agg(mean("wind").alias("windiest")).sort(desc("windiest")).show()

+----+------------------+
|year|          windiest|
+----+------------------+
|2012| 3.400819672131148|
|2014|3.3876712328767122|
|2015|3.1597260273972596|
|2013|3.0158904109589044|
+----+------------------+



In [77]:
weather=weather.withColumn("month", month("date"))
weather1=weather[weather.month==1]
weather1.groupBy("weather").agg(count('weather')).show()

+-------+--------------+
|weather|count(weather)|
+-------+--------------+
|drizzle|            10|
|   rain|            35|
|    sun|            33|
|   snow|             8|
|    fog|            38|
+-------+--------------+



In [85]:
 (weather.filter(year("date") == 2015)
    .filter(quarter('date') == 3)
    .withColumn("rainy", (weather.weather == 'rain').cast("int"))
    .groupBy(quarter('date'))
    .agg(mean('rainy') * 100)
    
    .show())

+-------------+------------------+
|quarter(date)|(avg(rainy) * 100)|
+-------------+------------------+
|            3|2.1739130434782608|
+-------------+------------------+



In [86]:
(
    weather.filter(year("date") == 2015)
    .filter(quarter('date') == 3)
    .withColumn("raining", (weather.weather == 'rain').cast("int"))
    .groupBy(quarter('date'))
    .agg(mean('raining') * 100)
    
    .show()
)

+-------------+--------------------+
|quarter(date)|(avg(raining) * 100)|
+-------------+--------------------+
|            3|  2.1739130434782608|
+-------------+--------------------+



In [84]:
(
    weather.withColumn("rain_day", (weather.precipitation != 0).cast("int"))
    .groupBy(year('date'))
    .agg(mean('rain_day') * 100)
    
    .show()
)

+----------+---------------------+
|year(date)|(avg(rain_day) * 100)|
+----------+---------------------+
|      2012|    48.36065573770492|
|      2013|    41.64383561643836|
|      2014|     41.0958904109589|
|      2015|    39.45205479452055|
+----------+---------------------+

