In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 
from pyspark.sql import types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from config_variables import var_credentials_location, var_gcs_connector
import requests
import pandas as pd
from io import StringIO




In [2]:
# Configuration for Spark Cluster
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('Data Cleaning + Loading to BQ') \
    .set("spark.jars", var_gcs_connector) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", var_credentials_location)

In [3]:
#  Configuring Hadoop

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", var_credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/08/10 11:07:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Initialising Spark Session
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate() 

In [5]:
type(spark)

pyspark.sql.session.SparkSession

23/08/10 11:07:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
bike_schema = types.StructType([
    types.StructField('Year', types.IntegerType(), True), 
    types.StructField('UnqID', types.StringType(), True), 
    types.StructField('Date', types.StringType(), True), 
    types.StructField('Weather', types.StringType(), True), 
    types.StructField('Time', types.TimestampType(), True), 
    types.StructField('Day', types.StringType(), True), 
    types.StructField('Round', types.StringType(), True), 
    types.StructField('Dir', types.StringType(), True), 
    types.StructField('Path', types.StringType(), True), 
    types.StructField('Mode', types.StringType(), True), 
    types.StructField('Count', types.IntegerType(), True)
])


In [7]:
# Reading GCS Data into Spark DF
df_test = spark.read \
    .option("header", "true") \
    .schema(bike_schema) \
    .csv("gs://pipelineproject01-data-bronze-bucket/raw/2015/Central/*")


In [8]:
final_columns = ["Date", "UUID", "Weather", "Time", "Day", "Area", "Count"]

df_test = df_test \
        .withColumn("Date", F.to_date(df_test["Date"], "dd/MM/yyyy")) \
        .withColumn("Time", F.date_format(df_test["Time"], "HH:mm:ss")) \
        .withColumnRenamed("UnqID", "UUID") \
        .withColumn("Area", F.lit("Central")) \
        .select(final_columns) 
        

In [9]:
df_test.createOrReplaceTempView("test_data")

df_test2 = spark.sql("""
select 
        UUID
        ,Area
        ,Date
        ,Time
        ,Day
        ,CASE
            WHEN Weather IN 
                ('Dry', 'Dry/hot', 'Dry Warm', 'Dry & Windy', 'Dry Chill', 'Dry/cold', 'Sunny Dry'
                , 'Sun', 'Bright', 'Sunny', 'Sunny Overcast', 'Sunny/cloudy', 'Partly Sunny', 'Bright + Cloudy'
                ,'Sun Setting' ,'Sunny + Cloudy' ,'Fine/dry', 'Good', 'Fine', "Sun", "Dry              ..."
                , "Dry/ Sunny", "Cloudy/sunny", "Warm + Dry", "Druy", "Dry", "Dry/Sunny", "Dry/sun", "Fine + Dry"
                , "Dry/good", "Dry/mild", "Fine + Hot", "Fair", "Sunny Dry", "Dry/sunny", " Dry") 
            THEN 'Clear/Sunny'
            WHEN Weather IN 
                ('Cloudy', 'Cloudy/dry', 'Overcast', 'Cloudy Sunny' , 'Cloudy + Sunny', 'Cold/cloudy'
                ,'Dry/overcast', "Dark/cloudy", "Dark Cloudy", "Dull") 
            THEN 'Cloudy'
            WHEN Weather IN 
                ('Wet/damp', 'Wet ', 'Very Wet', 'Damp', 'Drizzle', 'Light Shrs', 'Mizzle'
                ,'V. Light Drizzle', 'Light Rain', 'Drizzly]', 'Drtizzly', 'Drizzel', 'Drizzly'
                ,'Light Showers', 'Dry Wet Road' ,'Mix Wet/dry', 'V. Light Drizzle', 'Dry Wet Road'
                , "S.wet", "S/w", "Wet", "Wet/dry", "Showers", "S. Wet", "Shower", "Some Showers"
                , "Road Wet", "Dry/wet") 
            THEN 'Light Rain'
            WHEN Weather IN 
                ('Windy/rain', 'Windy + Sunny', 'Wind/ Showers', 'Windy + Sunny', 'Windy', 'Windy/rain'
                ,'Rain & Cloudy', 'Windy/wet', 'Rain & Windy', 'Windy/rain', 'Very Windy', 'Cloudy + Rain'
                ,'Cloudy/rain/sunny' , 'Sunsetting + Windy', 'High Wind' ,'Cold', 'Cold/cloudy'
                , "Cloudy/rain", "Rain", "Wet And Windy", "Dry/windy", "Dry Very Windy", "Raining"
                , "Wet/windy", "Dry Cold", "Rain/cloudy", "Heavy Rain", "Cold/sunny", "Cloudy/windy"
                , "Wet/v.windy", "Rains", "Dark/dry", "Dry/dark", "Dark Dry", "Dry Dark") 
            THEN 'Windy/Rainy/Cold'
            ELSE "Unknown"
        END AS GroupedWeather 
        ,Count
from 
        test_data 
;
""")


In [None]:
type(df_test2)

pyspark.sql.dataframe.DataFrame

23/08/10 12:43:51 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 125441 ms exceeds timeout 120000 ms
23/08/10 12:43:51 WARN SparkContext: Killing executors is not supported by current scheduler.
23/08/10 12:43:57 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

### DataSet Info

UnqID - UUID for London locations

Count - Disaggregated by site, direction, 15-min period and by path and mode classifications that vary depending on the area and year

Mode - Drop this column

Path - Drop this column

In [7]:
spark

In [8]:
# spark.stop()