In [15]:
from pyspark.sql import SparkSession
import pandas as pd
import pyarrow as parrow
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from pyspark.sql import types
from pyspark.sql.functions import year

# Initialize a SparkSession
spark = SparkSession.builder \
            .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.30.0.jar') \
            .getOrCreate()

# Read the CSV file into a DataFrame
schema = types.StructType([
    types.StructField("departure", types.TimestampType(), True),
    types.StructField("return", types.TimestampType(), True),
    types.StructField("departure_id", types.IntegerType(), True),
    types.StructField("departure_name", types.StringType(), True),
    types.StructField("return_id", types.IntegerType(), True),
    types.StructField("return_name", types.StringType(), True),
    types.StructField("distance (m)", types.DoubleType(), True),
    types.StructField("duration (sec.)", types.DoubleType(), True),
    types.StructField("avg_speed (km/h)", types.DoubleType(), True),
    types.StructField("departure_latitude", types.DoubleType(), True),
    types.StructField("departure_longitude", types.DoubleType(), True),
    types.StructField("return_latitude", types.DoubleType(), True),
    types.StructField("return_longitude", types.DoubleType(), True),
    types.StructField("Air temperature (degC)", types.DoubleType(), True)
    ])
    
raw = spark.read \
    .format("csv") \
    .option("compression", "gzip") \
    .option("header", True) \
    .option("inferSchema", True)\
    .schema(schema)\
    .load("gs://pfcllotsb7jsqqvbjrnw3s-datasets/bike-rides-data.tar.gz")



root
 |-- departure: timestamp (nullable = true)
 |-- return: timestamp (nullable = true)
 |-- departure_id: integer (nullable = true)
 |-- departure_name: string (nullable = true)
 |-- return_id: integer (nullable = true)
 |-- return_name: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- duration_sec: double (nullable = true)
 |-- avg_speed_km_h: double (nullable = true)
 |-- departure_latitude: double (nullable = true)
 |-- departure_longitude: double (nullable = true)
 |-- return_latitude: double (nullable = true)
 |-- return_longitude: double (nullable = true)
 |-- air_temp_celcius: double (nullable = true)
 |-- year: integer (nullable = true)

+-------------------+-------------------+------------+------------------+---------+--------------------+--------+------------+------------------+------------------+-------------------+------------------+------------------+----------------+----+
|          departure|             return|departure_id|    departure_name|retu

In [21]:
# clean up data
raw = raw.withColumnRenamed("distance (m)", "distance")\
        .withColumnRenamed("duration (sec.)", "duration_sec")\
        .withColumnRenamed("avg_speed (km/h)", "avg_speed_km_h")\
        .withColumnRenamed("Air temperature (degC)", "air_temp_celcius")\
        .withColumn("year", year(raw["departure"]))\
        .filter(raw.departure.isNotNull()) 

raw.printSchema()
raw.show()
print(f"raw data count: {raw.count()}")

#some years are missing departure id, so, we need to fill those using departure name if possible
departure_points = raw.filter(raw.departure_id.isNotNull())\
                    .select("departure_id", "departure_name")\
                    .distinct()

departure_points.show()

raw.createOrReplaceTempView('raw')
departure_points.createOrReplaceTempView('dept_points')

cleaned = spark.sql("""
                      SELECT raw.departure, raw.return, nvl(raw.departure_id, dept_points.departure_id) as departure_id, raw.departure_name,raw.return_id, raw.return_name, 
                      raw.distance, raw.duration_sec, raw.avg_speed_km_h, raw.departure_latitude, 
                      raw.departure_longitude, raw.return_latitude, raw.return_longitude, 
                      raw.air_temp_celcius, raw.year
                      FROM raw
                      LEFT JOIN dept_points ON raw.departure_name = dept_points.departure_name
                      """)
cleaned.printSchema()
print(f"cleaned data count: {cleaned.count()}")

root
 |-- departure: timestamp (nullable = true)
 |-- return: timestamp (nullable = true)
 |-- departure_id: integer (nullable = true)
 |-- departure_name: string (nullable = true)
 |-- return_id: integer (nullable = true)
 |-- return_name: string (nullable = true)
 |-- distance: double (nullable = true)
 |-- duration_sec: double (nullable = true)
 |-- avg_speed_km_h: double (nullable = true)
 |-- departure_latitude: double (nullable = true)
 |-- departure_longitude: double (nullable = true)
 |-- return_latitude: double (nullable = true)
 |-- return_longitude: double (nullable = true)
 |-- air_temp_celcius: double (nullable = true)
 |-- year: integer (nullable = true)

+-------------------+-------------------+------------+------------------+---------+--------------------+--------+------------+------------------+------------------+-------------------+------------------+------------------+----------------+----+
|          departure|             return|departure_id|    departure_name|retu

[Stage 29:>                                                         (0 + 1) / 1]

raw data count: 12157458


                                                                                

+------------+--------------------+
|departure_id|      departure_name|
+------------+--------------------+
|         757|           Painiitty|
|         719|              Säteri|
|         263| Herttoniemen kirkko|
|         615|        Tiistiläntie|
|          35|     Cygnaeuksenkatu|
|         709|     Yhdyskunnankuja|
|         737|        Muurarinkuja|
|         129|         Pernajantie|
|         122|     Lintulahdenkatu|
|          99|          Muusantori|
|          93|         Torpanranta|
|         107|         Tenholantie|
|          82|         Töölöntulli|
|         555|        Kalevalantie|
|         521|      Kulttuuriaukio|
|         525|          Mäntyviita|
|         563|  Mankkaanlaaksontie|
|         509|       Revontulentie|
|         701| Gallen-Kallelan tie|
|         571|Tapiolan urheilup...|
+------------+--------------------+
only showing top 20 rows

root
 |-- departure: timestamp (nullable = true)
 |-- return: timestamp (nullable = true)
 |-- departure_id: i



cleaned data count: 13143189


                                                                                

In [2]:
# save clean data into datalake
root_path = f'gs://zoomcamp_project_pfcllotsb7jsqqvbjrnw3s/helinski-bike-trips'
column = "year"
partitioned = cleaned.repartition(column)
partitioned.write.partitionBy(column).mode("overwrite").parquet(root_path)


24/04/13 19:32:04 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1713025972259_0006_01_000002 on host: pfcllotsb7jsqqvbjrnw3s-dataproc-m.europe-west1-b.c.data-eng-zoomcamp-420022.internal. Exit status: 137. Diagnostics: [2024-04-13 19:32:04.827]Container killed on request. Exit code is 137
[2024-04-13 19:32:04.827]Container exited with a non-zero exit code 137. 
[2024-04-13 19:32:04.828]Killed by external signal
.
24/04/13 19:32:04 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1713025972259_0006_01_000002 on host: pfcllotsb7jsqqvbjrnw3s-dataproc-m.europe-west1-b.c.data-eng-zoomcamp-420022.internal. Exit status: 137. Diagnostics: [2024-04-13 19:32:04.827]Container killed on request. Exit code is 137
[2024-04-13 19:32:04.827]Container exited with a non-zero exit code 137. 
[2024-04-13 19:32:04.828]Killed by external signal
.
2

In [14]:
# Write the data to a bigQuery table, partitioned by year, clustered by 
from google.cloud import bigquery

# BigQuery client setup
client = bigquery.Client()

# Specify your dataset and table
dataset_id = 'zoomcamp_db'
table_id = 'helinski_bike_trips'
table_full_id = f"{client.project}.{dataset_id}.{table_id}"

# create the table
# table = bigquery.Table(table_full_id)
# table = client.create_table(table, exists_ok=True)

# Write the DataFrame to BigQuery, schema should be inferred automatically from pyspark
partitioned.write.format("bigquery") \
    .option('table', table_full_id) \
    .option("writeMethod", "direct") \
    .option("partitionField", "departure")\
    .option("partitionType", "YEAR")\
    .option("clusteredFields", "departure_id")\
    .mode('overwrite') \
    .save()


# set table partitioning and clustering
# table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.YEAR, field="year")
# table.clustering_fields = ["departure_id"]
# table = client.update_table(table) # API request



                                                                                