In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [2]:
credentials_location = '/Users/anzelam/ac/google/ac.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('GCSFilesRead') \
    .set("spark.jars", "../lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/04/14 11:18:50 WARN Utils: Your hostname, Anzela--MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.7.108 instead (on interface en0)
24/04/14 11:18:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/14 11:18:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
running_df = spark.read.csv('gs://de-running-project-bucket/raw/TWO_CENTURIES_OF_UM_RACES.csv', header=True)

In [6]:
running_df.head()

Row(Year of event='2018', Event dates='06.01.2018', Event name='Selva Costera (CHI)', Event distance/length='50km', Event number of finishers='22', Athlete performance='4:51:39 h', Athlete club='Tnfrc', Athlete country='CHI', Athlete year of birth='1978.0', Athlete gender='M', Athlete age category='M35', Athlete average speed='10.286', Athlete ID='0')

In [7]:
column_mapping = {
    "Year of event": "year_of_event",
    "Event dates": "event_dates",
    "Event name": "event_name",
    "Event distance/length": "event_distance_length",
    "Event number of finishers": "event_num_finishers",
    "Athlete performance": "athlete_performance",
    "Athlete club": "athlete_club",
    "Athlete country": "athlete_country",
    "Athlete year of birth": "athlete_year_of_birth",
    "Athlete gender": "athlete_gender",
    "Athlete age category": "athlete_age_category",
    "Athlete average speed": "athlete_average_speed",
    "Athlete ID": "athlete_id"
}

for old_col, new_col in column_mapping.items():
    running_df = running_df.withColumnRenamed(old_col, new_col)

In [8]:
running_df.show()

+-------------+-----------+-------------------+---------------------+-------------------+-------------------+--------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+
|year_of_event|event_dates|         event_name|event_distance_length|event_num_finishers|athlete_performance|        athlete_club|athlete_country|athlete_year_of_birth|athlete_gender|athlete_age_category|athlete_average_speed|athlete_id|
+-------------+-----------+-------------------+---------------------+-------------------+-------------------+--------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+
|         2018| 06.01.2018|Selva Costera (CHI)|                 50km|                 22|          4:51:39 h|               Tnfrc|            CHI|               1978.0|             M|                 M35|               10.286|         0|
|         2018| 06.01.2018|Selva Costera (CHI)| 

In [9]:
running_df = running_df.withColumn(
    "event_type",
    F.when(F.col("event_distance_length").rlike(r"\d+[kKmMi]"), "Distance")
    .when(F.col("event_distance_length").rlike(r"\d+[dh]"), "Time")
    .otherwise("Unknown")
)

In [10]:
max_reasonable_speed = 25.0  # km/h

running_df = running_df.withColumn(
    "athlete_average_speed",
    F.when(F.col("athlete_average_speed") <= max_reasonable_speed, F.col("athlete_average_speed"))
    .otherwise(F.col("athlete_average_speed") / 1000.0)  # Convert from m/s to km/h
)

In [11]:
#Remove the h from the 'Athlete Performance' to show the hours
running_df = running_df.withColumn('athlete_performance', F.split(F.col('athlete_performance'), ' ')[0])

In [20]:
running_df.createOrReplaceTempView('running_data')

In [33]:
sql_query = """
SELECT
    *,
    CASE 
        WHEN SUBSTRING_INDEX(event_distance_length, 'mi', 1) = event_distance_length THEN CAST(SUBSTRING_INDEX(event_distance_length, 'km', 1) AS INT)
        WHEN SUBSTRING_INDEX(event_distance_length, 'km', 1) = event_distance_length THEN CAST(SUBSTRING_INDEX(event_distance_length, 'km', 1) AS INT)
        ELSE 'Unknown'
    END AS distance_in_km
FROM running_data
"""

result_df = spark.sql(sql_query)
result_df.show()

+-------------+-----------+-------------------+---------------------+-------------------+-------------------+--------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+----------+--------------+
|year_of_event|event_dates|         event_name|event_distance_length|event_num_finishers|athlete_performance|        athlete_club|athlete_country|athlete_year_of_birth|athlete_gender|athlete_age_category|athlete_average_speed|athlete_id|event_type|distance_in_km|
+-------------+-----------+-------------------+---------------------+-------------------+-------------------+--------------------+---------------+---------------------+--------------+--------------------+---------------------+----------+----------+--------------+
|         2018| 06.01.2018|Selva Costera (CHI)|                 50km|                 22|            4:51:39|               Tnfrc|            CHI|               1978.0|             M|                 M35|    

In [35]:
#df.write.parquet('cleaned_data.parquet')
result_df.repartition(10).write.parquet('gs://de-running-project-bucket/stage/')

24/04/14 12:13:53 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [36]:
spark.stop()

In [37]:
# sc.stop()