In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/ojekky/.gc/my-creds.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/ojekky/data-engineering-zoomcamp/05-batch/code/lib/gcs-connector-hadoop2-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/04/12 10:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
df_lat = spark.read.parquet('gs://global-rookery-448215-m8_bike_data_raw/lat_bike_paq/*')

df_old = spark.read.parquet('gs://global-rookery-448215-m8_bike_data_raw/old_bike_paq/*')

                                                                                

In [6]:
import pandas as pd

In [7]:
from pyspark.sql import functions as F

In [8]:
df_lat.dtypes

[('ride_id', 'int'),
 ('rideable_type', 'int'),
 ('started_at', 'timestamp'),
 ('ended_at', 'timestamp'),
 ('start_station_name', 'string'),
 ('start_station_id', 'int'),
 ('end_station_name', 'string'),
 ('end_station_id', 'int'),
 ('start_lat', 'double'),
 ('start_lng', 'double'),
 ('end_lat', 'double'),
 ('end_lng', 'double'),
 ('member_casual', 'int')]

In [9]:
df_old.dtypes

[('Duration', 'bigint'),
 ('Start date', 'timestamp'),
 ('End date', 'timestamp'),
 ('Start station number', 'int'),
 ('Start station', 'string'),
 ('End station number', 'int'),
 ('End station', 'string'),
 ('Bike number', 'int'),
 ('Member type', 'int')]

In [10]:
# Rename columns in the old dataset (2010-2020-03)
df_old = df_old.withColumnRenamed('Start date', 'started_at') \
               .withColumnRenamed('End date', 'ended_at') \
               .withColumnRenamed('Start station number', 'start_station_id') \
               .withColumnRenamed('Start station', 'start_station_name') \
               .withColumnRenamed('End station number', 'end_station_id') \
               .withColumnRenamed('End station', 'end_station_name') \
               .withColumnRenamed('Member type', 'member_casual') \
               .withColumnRenamed('Bike number', 'ride_id') \
               .withColumnRenamed('Duration', 'trip_duration')

In [11]:
# Add missing columns to the old dataset
df_old = df_old.withColumn('rideable_type', F.lit(None).cast('int')) \
               .withColumn('start_lat', F.lit(None).cast('double')) \
               .withColumn('start_lng', F.lit(None).cast('double')) \
               .withColumn('end_lat', F.lit(None).cast('double')) \
               .withColumn('end_lng', F.lit(None).cast('double'))

In [13]:
# Add missing columns to the new dataset (2020-05-2025)
df_lat = df_lat.withColumn('trip_duration', 
               F.unix_timestamp('ended_at') - F.unix_timestamp('started_at'))

In [14]:
# Select common columns in the same order
common_columns = [
    'ride_id', 'rideable_type', 'started_at', 'ended_at', 'trip_duration',
    'start_station_id', 'start_station_name', 'end_station_id', 'end_station_name',
    'member_casual', 'start_lat', 'start_lng', 'end_lat', 'end_lng'
]

# Union the datasets
combined_df = df_old.select(common_columns).unionByName(df_lat.select(common_columns))

In [16]:
combined_df.dtypes

[('ride_id', 'int'),
 ('rideable_type', 'int'),
 ('started_at', 'timestamp'),
 ('ended_at', 'timestamp'),
 ('trip_duration', 'bigint'),
 ('start_station_id', 'int'),
 ('start_station_name', 'string'),
 ('end_station_id', 'int'),
 ('end_station_name', 'string'),
 ('member_casual', 'int'),
 ('start_lat', 'double'),
 ('start_lng', 'double'),
 ('end_lat', 'double'),
 ('end_lng', 'double')]

In [17]:
combined_df \
    .repartition(1) \
    .write.parquet('gs://global-rookery-448215-m8_bike_data_raw/full_raw_paq', mode='overwrite')

                                                                                