In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=83d0e13370371956acc425d416cb92a294b9ea2ea028967ae36b7c65e6f77e66
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from google.colab import drive
from pyspark.sql import functions as F

In [3]:
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [11]:
class ETLJob:

    def __init__(self):
        self.spark_session = SparkSession.builder.master("local[*]").getOrCreate()

    def extract(self,input_path):

        df=self.spark_session.read.csv(input_path,inferSchema=True,header=True)

        return df


    def transform(self,df):
        df=df.select(['Location','Brew_Date','Volume_Produced'])\
        .withColumn('Brew_Month', F.date_format('Brew_Date', "yyyy-MM"))\
        .groupBy('Brew_Month')\
        .pivot('Location')\
        .sum('Volume_Produced')\
        .sort('Brew_Month')

        return df

    def load(self, df,output_path):
        df.write.csv(output_path,header=True,mode='overwrite')

    def run(self,input_path,output_path):
        return self.load(self.transform(self.extract(input_path)),output_path)


In [12]:
job=ETLJob()
job.run('gdrive/My Drive/Colab Notebooks/brewery_data_complete_extended.csv',\
        'gdrive/My Drive/Colab Notebooks/brewery_data_output.csv')


In [13]:
spark_session = SparkSession.builder.master("local[*]").getOrCreate()

In [14]:
df=spark_session.read.csv('gdrive/My Drive/Colab Notebooks/brewery_data_output.csv',inferSchema=True,header=True)

In [15]:
df.show()

+-------------------+---------------+----------+-----------+---------+-----------+-----------+------------+-----------+----------+---------+
|         Brew_Month|Electronic City|HSR Layout|Indiranagar|Jayanagar|Koramangala|Malleswaram|Marathahalli|Rajajinagar|Whitefield|Yelahanka|
+-------------------+---------------+----------+-----------+---------+-----------+-----------+------------+-----------+----------+---------+
|2020-01-01 00:00:00|       58157063|  57791779|   58476841| 59240364|   58033941|   58244295|    57765712|   58559281|  59268304| 58472454|
|2020-02-01 00:00:00|       54036768|  53959900|   54539968| 54237101|   54791788|   54492559|    54298376|   54987598|  54299030| 55047953|
|2020-03-01 00:00:00|       57938008|  58488866|   58617914| 58510168|   57998335|   58202733|    58370414|   58290870|  58434380| 58100744|
|2020-04-01 00:00:00|       57240763|  56297919|   56936309| 57233419|   56560284|   56095060|    56291880|   55730412|  56683627| 56855324|
|2020-05-01 0