In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/ojekky/.gc/my-creds.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/ojekky/data-engineering-zoomcamp/05-batch/code/lib/gcs-connector-hadoop2-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/04/01 12:20:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
df_divvy = spark.read.csv("gs://global-rookery-448215-m8_divvy_bikes/divvy-tripdata/*/*", header=True, inferSchema=True)

                                                                                

In [6]:
import pandas as pd

In [7]:
df_divvy.dtypes

[('ride_id', 'string'),
 ('rideable_type', 'string'),
 ('started_at', 'string'),
 ('ended_at', 'string'),
 ('start_station_name', 'string'),
 ('start_station_id', 'string'),
 ('end_station_name', 'string'),
 ('end_station_id', 'string'),
 ('start_lat', 'string'),
 ('start_lng', 'string'),
 ('end_lat', 'string'),
 ('end_lng', 'string'),
 ('member_casual', 'string')]

In [14]:
df_eni = df_divvy.head(100)

In [15]:
spark.createDataFrame(df_eni).schema

StructType([StructField('ride_id', StringType(), True), StructField('rideable_type', StringType(), True), StructField('started_at', StringType(), True), StructField('ended_at', StringType(), True), StructField('start_station_name', StringType(), True), StructField('start_station_id', StringType(), True), StructField('end_station_name', StringType(), True), StructField('end_station_id', StringType(), True), StructField('start_lat', StringType(), True), StructField('start_lng', StringType(), True), StructField('end_lat', StringType(), True), StructField('end_lng', StringType(), True), StructField('member_casual', StringType(), True)])

In [18]:
from pyspark.sql import types

In [19]:
divvy_schema = types.StructType([
    types.StructField('ride_id', types.StringType(), True),
    types.StructField('rideable_type', types.StringType(), True),
    types.StructField('started_at', types.TimestampType(), True),
    types.StructField('ended_at', types.TimestampType(), True),
    types.StructField('start_station_name', types.StringType(), True),
    types.StructField('start_station_id', types.StringType(), True),
    types.StructField('end_station_name', types.StringType(), True),
    types.StructField('end_station_id', types.StringType(), True),
    types.StructField('start_lat', types.DoubleType(), True),
    types.StructField('start_lng', types.DoubleType(), True),
    types.StructField('end_lat', types.DoubleType(), True),
    types.StructField('end_lng', types.DoubleType(), True),
    types.StructField('member_casual', types.StringType(), True)
])

In [25]:
df_divvy = spark.read.csv(
    "gs://global-rookery-448215-m8_divvy_bikes/divvy-tripdata/*/*", 
    header = True, 
    schema = divvy_schema,
    escape='"',
    nullValue="null"
)

df_divvy \
    .repartition(1) \
    .write.parquet("gs://global-rookery-448215-m8_divvy_bikes/divvy_paq")

df_divvy.printSchema()
print(f"Number of partitions: {df_divvy.rdd.getNumPartitions()}")
print(f"Total records: {df_divvy.count()}")

25/04/01 15:40:20 ERROR Utils: Aborting task                        (0 + 4) / 4]
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.ByteBuffer.wrap(ByteBuffer.java:393)
	at java.base/java.nio.ByteBuffer.wrap(ByteBuffer.java:422)
	at org.apache.parquet.io.api.Binary$ByteArrayBackedBinary.toByteBuffer(Binary.java:371)
	at org.apache.parquet.schema.PrimitiveComparator$9.compareBinary(PrimitiveComparator.java:219)
	at org.apache.parquet.schema.PrimitiveComparator$BinaryComparator.compareNotNulls(PrimitiveComparator.java:186)
	at org.apache.parquet.schema.PrimitiveComparator$BinaryComparator.compareNotNulls(PrimitiveComparator.java:183)
	at org.apache.parquet.schema.PrimitiveComparator.compare(PrimitiveComparator.java:63)
	at org.apache.parquet.column.statistics.BinaryStatistics.updateStats(BinaryStatistics.java:62)
	at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:242)
	at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addB

ConnectionRefusedError: [Errno 111] Connection refused

In [22]:
df_divvy.dtypes

[('ride_id', 'string'),
 ('rideable_type', 'string'),
 ('started_at', 'timestamp'),
 ('ended_at', 'timestamp'),
 ('start_station_name', 'string'),
 ('start_station_id', 'string'),
 ('end_station_name', 'string'),
 ('end_station_id', 'string'),
 ('start_lat', 'double'),
 ('start_lng', 'double'),
 ('end_lat', 'double'),
 ('end_lng', 'double'),
 ('member_casual', 'string')]

In [23]:
df_divvy.show()

+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+------------------+----------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|           end_lat|   end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+------------------+----------------+--------------------+--------------+---------+---------+------------------+----------+-------------+
|99FEC93BA843FB20|electric_bike|2021-06-13 14:31:28|2021-06-13 14:34:11|              NULL|            NULL|                NULL|          NULL|     41.8|   -87.59|              41.8|     -87.6|       member|
|06048DCFC8520CAF|electric_bike|2021-06-04 11:18:02|2021-06-04 11:24:19|              NULL|            NULL|                NULL|          NULL|    41.79|   -87.59|

                                                                                