In [12]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, DateType
from pyspark.sql.functions import *

#set up spark (session)
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("BatchPipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

#set up hadoop fs configuration
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#retrieve data from bucket
#google storage file path
#make sure this is a separate bucket that only has the correct files in there
gsc_file_path = 'gs://de_jads_batch_data2/' # bucket name with whodata.csv file

dataSchema = StructType(
    [StructField("ParentLocation", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Period", LongType(), True),
    StructField("isLatestYear", StringType(), True),
    StructField("Dim1", StringType(), True),
    StructField("FactValueNumeric", DoubleType(), True)
    ])

consumed_import = spark.read.format("csv").schema(dataSchema).option("header", "true") \
    .load(gsc_file_path+'whodata.csv')

consumed_import.printSchema()

root
 |-- ParentLocation: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Period: long (nullable = true)
 |-- isLatestYear: string (nullable = true)
 |-- Dim1: string (nullable = true)
 |-- FactValueNumeric: double (nullable = true)



In [13]:
consumed = consumed_import.select(col('Location').alias('Country') , col('Period').alias('Year'), col('FactValueNumeric').alias('Litres_alcohol')) \
    .where(col('Dim1') == 'All types')\
    .sort('Year') \

consumed.show()

+--------------------+----+--------------+
|             Country|Year|Litres_alcohol|
+--------------------+----+--------------+
|United States of ...|1960|          7.83|
|             Austria|1960|          8.91|
|         Afghanistan|1961|           0.0|
|              Guinea|1961|          0.24|
|Micronesia (Feder...|1961|           0.0|
|                Oman|1961|           0.0|
|          Bangladesh|1961|           0.0|
|              Kuwait|1961|          0.02|
|               Nepal|1961|           0.0|
|          Mauritania|1961|          0.11|
|               Qatar|1961|           0.0|
|        Saudi Arabia|1961|           0.0|
|            Pakistan|1961|          0.01|
|           Indonesia|1961|          0.03|
|              Malawi|1961|          0.05|
|             Somalia|1961|          0.06|
|             Comoros|1961|          0.12|
|               Niger|1961|          0.13|
|Iran (Islamic Rep...|1961|          0.14|
|              Jordan|1961|          0.14|
+----------

In [14]:
yearly_ww_consumed = consumed.select('Year', 'Litres_alcohol') \
    .groupBy('Year') \
    .agg(avg('Litres_alcohol').alias('Litres_alcohol')) \
    .withColumn("Country", lit("Worldwide"))

yearly_ww_consumed.show()

+----+------------------+---------+
|Year|    Litres_alcohol|  Country|
+----+------------------+---------+
|1960| 8.370000000000001|Worldwide|
|1961|3.7132867132867133|Worldwide|
|1962| 3.817260273972604|Worldwide|
|1963|3.9981045751633966|Worldwide|
|1964| 4.081307189542484|Worldwide|
|1965|4.1433333333333335|Worldwide|
|1966| 4.286233766233765|Worldwide|
|1967| 4.313701298701301|Worldwide|
|1968| 4.387272727272727|Worldwide|
|1969| 4.461233766233767|Worldwide|
|1970| 4.644774193548388|Worldwide|
|1971| 4.726129032258064|Worldwide|
|1972| 4.818774193548387|Worldwide|
|1973| 5.024935897435899|Worldwide|
|1974| 5.041730769230769|Worldwide|
|1975|  5.10294871794872|Worldwide|
|1976| 5.168701298701301|Worldwide|
|1977| 5.180064516129034|Worldwide|
|1978| 5.149350649350646|Worldwide|
|1979| 5.139032258064514|Worldwide|
+----+------------------+---------+
only showing top 20 rows



In [15]:
joinExpression = ["Year", "Country", "Litres_alcohol"]
consumption = consumed.join(yearly_ww_consumed, joinExpression, "full")

consumption.show(10)

+----+--------------------+-----------------+
|Year|             Country|   Litres_alcohol|
+----+--------------------+-----------------+
|1960|United States of ...|             7.83|
|1960|           Worldwide|8.370000000000001|
|1960|             Austria|             8.91|
|1961|         Afghanistan|              0.0|
|1961|          Bangladesh|              0.0|
|1961|Micronesia (Feder...|              0.0|
|1961|               Nepal|              0.0|
|1961|                Oman|              0.0|
|1961|               Qatar|              0.0|
|1961|        Saudi Arabia|              0.0|
+----+--------------------+-----------------+
only showing top 10 rows



In [12]:
#write data to bucket
consumption.write.mode("overwrite").format("csv").save("gs://jadsdenb/consumed") # bucket name !

In [16]:
#use the cloud storage bucket for temporary BigQuery export data used by the connector
bucket = "de_jads_temp_annelies" # bucket name !
spark.conf.set('temporaryGcsBucket', bucket)

#save the combined matches data to BigQuery -> do not forget to change project ID
consumption.write.format('bigquery') \
    .option('table', 'de2022-362620.assignment2dataset.consumption') \
    .mode("overwrite") \
    .save()

In [17]:
# stop the spark context
spark.stop()