In [3]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell''

In [4]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data cleaning").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [7]:
from docutils.nodes import header
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, regexp_replace, split, round

spark = SparkSession.builder.appName("ETL").enableHiveSupport().getOrCreate()

In [8]:
# -----------------   write data from local, into HDFS   -----------------------------------

# Define the local file path and the HDFS path
local_athlete = "file:///home/talentum/MiniProject/athlete_events.csv"
hdfs_athlete = "athlete_events.csv"

local_winners = "file:///home/talentum/MiniProject/Winners.csv"
hdfs_winners = "Winners.csv"

local_noc = "file:///home/talentum/MiniProject/noc_regions.csv"
hdfs_noc = "noc.csv"

In [10]:
athlete_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("Height", FloatType(), True),
    StructField("Weight", FloatType(), True),
    StructField("Team", StringType(), True),
    StructField("Games", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Event", StringType(), True),
])


In [11]:
winners_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("Games", StringType (), True),
    StructField("Sport", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True)
])

In [12]:
noc_schema = StructType([
    StructField("NOC", StringType(), True),
    StructField("region", StringType(), True),
    StructField("notes", StringType(), True)
])

In [13]:
# Reading the CSV file from the local filesystem
localdf_athlete = spark.read.csv(local_athlete, header=True, schema=athlete_schema)
local_df_winners = spark.read.csv(local_winners, header=True, schema=winners_schema)
local_df_noc = spark.read.csv(local_noc, header=True, schema=noc_schema)

In [15]:
# Coalesce to a single file
localdf_athlete = localdf_athlete.coalesce(1)
local_df_winners = local_df_winners.coalesce(1)
local_df_noc = local_df_noc.coalesce(1)

In [16]:
# Write to HDFS
localdf_athlete.write.mode('overwrite').parquet(hdfs_athlete)
local_df_winners.write.mode('overwrite').parquet(hdfs_winners)
local_df_noc.write.mode('overwrite').parquet(hdfs_noc)


In [17]:
# -----------------   reading data from HDFS    --------------------------------

# Read the Parquet files from HDFS
df_athlete = spark.read.schema(athlete_schema).parquet(hdfs_athlete)
df_winners = spark.read.schema(winners_schema).parquet(hdfs_winners)
df_noc = spark.read.schema(noc_schema).parquet(hdfs_noc)

In [18]:
# -----------------   splitting athlete    ---------------------------------

## splitting columns
df_athlete = df_athlete.withColumn('Year', split(df_athlete['Games'], ' ').getItem(0)) \
    .withColumn('Season', split(df_athlete['Games'], ' ').getItem(1))

In [19]:
# -----------------   formatting athlete into km, m, etc    ---------------------------------

# Replacement rules
replacements = {
    'kilometres': 'km',
    'kilometers': 'km',
    'metres': 'm',
    'meters': 'm'
}

for old, new in replacements.items():
    df_athlete = df_athlete.withColumn("Event", regexp_replace(col("Event"), old, new))

In [20]:
# -----------------   Calculate BMI    ---------------------------------

df_athlete = df_athlete.withColumn("BMI", round(col("Weight") / (col("Height") / 100) ** 2, 2))
df_athlete = df_athlete.withColumn("BMI", round(col("Weight") / (col("Height") / 100) ** 2, 2).cast(FloatType()))

In [21]:
# -----------------   Join athlete_events with winners    ---------------------------------

# Perform an inner join on 'ID' and 'Event' to combine athlete and winner data
df_selected = df_athlete.join(df_winners, ["ID"], "inner") \
    .select(
        df_athlete.ID,
        df_athlete.Name,
        df_winners.Team,
        df_winners.Games,
        df_winners.Sport,
        df_winners.Medal
    ).show(truncate=False)

+---+------------------------+--------------+-----------+----------+------+
|ID |Name                    |Team          |Games      |Sport     |Medal |
+---+------------------------+--------------+-----------+----------+------+
|4  |Edgar Lindenau Aabye    |Denmark/Sweden|1900 Summer|Tug-Of-War|Gold  |
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Ossian Aaltonen    |Finland       |1920 Summer|Swimming  |Bronze|
|15 |Arvo Os

In [22]:
# -----------------   Joining athlete_events with noc    ---------------------------------

df_joined_noc = df_athlete.join(df_noc, df_athlete.Team == df_noc.region, "inner") \
    .select(df_athlete.ID, df_athlete.Name, df_athlete.Sport, df_noc.NOC, df_noc.region).show(truncate=False)

+---+--------------------------------------+--------------------+---+-----------+
|ID |Name                                  |Sport               |NOC|region     |
+---+--------------------------------------+--------------------+---+-----------+
|1  |A Dijiang                             |Basketball          |HKG|China      |
|1  |A Dijiang                             |Basketball          |CHN|China      |
|2  |A Lamusi                              |Judo                |HKG|China      |
|2  |A Lamusi                              |Judo                |CHN|China      |
|3  |Gunnar Nielsen Aaby                   |Football            |DEN|Denmark    |
|5  |Christine Jacoba Aaftink              |Speed Skating       |NED|Netherlands|
|5  |Christine Jacoba Aaftink              |Speed Skating       |NED|Netherlands|
|5  |Christine Jacoba Aaftink              |Speed Skating       |NED|Netherlands|
|5  |Christine Jacoba Aaftink              |Speed Skating       |NED|Netherlands|
|5  |Christine J

In [23]:
# -----------------  INTO HIVE    ---------------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS project")

spark.sql("USE project")

spark.sql("DROP TABLE IF EXISTS athlete_events")
spark.sql("DROP TABLE IF EXISTS winners")
spark.sql("DROP TABLE IF EXISTS noc")

# ORC table
df_athlete.write.mode('overwrite').format("orc").saveAsTable("athlete_events")
df_winners.write.mode('overwrite').format("orc").saveAsTable("winners")
df_noc.write.mode('overwrite').format("orc").saveAsTable("noc")



spark.sql("SELECT * FROM athlete_events").show(truncate=False)
spark.sql("SELECT * FROM winners").show(truncate=False)
spark.sql("SELECT * FROM noc").show(truncate=False)

+---+------------------------+---+----+------+------+--------------+-----------+-----------+--------------------+-------------------------------------------+----+------+-----+
|ID |Name                    |Sex|Age |Height|Weight|Team          |Games      |City       |Sport               |Event                                      |Year|Season|BMI  |
+---+------------------------+---+----+------+------+--------------+-----------+-----------+--------------------+-------------------------------------------+----+------+-----+
|1  |A Dijiang               |M  |24.0|180.0 |80.0  |China         |1992 Summer|Barcelona  |Basketball          |Basketball Men's Basketball                |1992|Summer|24.69|
|2  |A Lamusi                |M  |23.0|170.0 |60.0  |China         |2012 Summer|London     |Judo                |Judo Men's Extra-Lightweight               |2012|Summer|20.76|
|3  |Gunnar Nielsen Aaby     |M  |24.0|null  |null  |Denmark       |1920 Summer|Antwerpen  |Football            |Footbal