In [27]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os

In [28]:
aws_access_key = os.environ["AWS_ACCESS_KEY"]
aws_secret_key = os.environ["AWS_SECRET_KEY"]
aws_region = os.environ["AWS_REGION"]
warehouse_location = os.environ["WAREHOUSE_LOCATION"]
metastore_uri = os.environ["METASTORE_URI"]

spark = SparkSession.builder.appName("Warehouse")\
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("spark.sql.hive.metastore.uris", metastore_uri) \
    .config("hive.metastore.uris", metastore_uri) \
    .config("hive.metastore.warehouse.dir", warehouse_location) \
    .config("hive.hadoop.fs.s3a.access.key", aws_access_key) \
    .config("hive.hadoop.fs.s3a.secret.key", aws_secret_key) \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.driver.memory", "5G") \
    .config("spark.memory.offHeap.size","16g") \
    .config("spark.memory.offHeap.enabled", True) \
    .enableHiveSupport() \
    .getOrCreate()

In [29]:
df = spark.read.csv(
    'craigslist_vehicles.csv', 
    header=True, 
)

                                                                                

In [30]:
df.show(truncate=False)

[Stage 1:>                                                          (0 + 1) / 1]

+------+----------+--------------------------------------------------------------------------------------------+-------+------------------------------+-------+------+------------+--------------------+---------+-----------+------+--------+------------+------------+-----------------+-----+-----------+--------+-----------+-------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [31]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = 

In [32]:
df = df\
.withColumn("year", F.col("year").cast("int"))\
.filter(F.length('year') == 4)

In [33]:
columns = [
    "id", 
    "region", 
    "price", 
    "year", 
    "manufacturer", 
    "model", 
    "condition", 
    "cylinders", 
    "fuel", 
    "odometer", 
    "title_status", 
    "transmission", 
    "drive", 
    "size", 
    "type", 
    "paint_color", 
    "state", 
    "lat", 
    "long"
]
df = df.select(columns)
df = df.withColumn("year", F.col("year").cast("string"))\
    .withColumn("year", F.expr("TO_DATE(CONCAT(year, '-01-01'), 'yyyy-MM-dd')"))\
    .filter(F.col("year").isNotNull())
df = df.filter(F.col("id").isNotNull())
df.write.mode("overwrite").saveAsTable("craigslist_vehicles_bronze.craigslist_vehicles")
spark.sql("show tables in craigslist_vehicles_bronze").show()
spark.sql("select * from craigslist_vehicles_bronze.craigslist_vehicles").show()

                                                                                

+--------------------+-------------------+-----------+
|           namespace|          tableName|isTemporary|
+--------------------+-------------------+-----------+
|craigslist_vehicl...|craigslist_vehicles|      false|
+--------------------+-------------------+-----------+



[Stage 3:>                                                          (0 + 1) / 1]

+----------+--------+-----+----------+------------+------------------+---------+-----------+------+--------+------------+------------+-----+---------+---------+-----------+--------------------+--------------------+-----------+
|        id|  region|price|      year|manufacturer|             model|condition|  cylinders|  fuel|odometer|title_status|transmission|drive|     size|     type|paint_color|               state|                 lat|       long|
+----------+--------+-----+----------+------------+------------------+---------+-----------+------+--------+------------+------------+-----+---------+---------+-----------+--------------------+--------------------+-----------+
|7316919327|portland| 7500|2004-01-01|        ford|         excursion|     good|8 cylinders|   gas|170000.0|       clean|   automatic|  4wd|     NULL|      SUV|     silver|                  or|             45.4446|  -122.5372|
|7316292635|portland| 2300|2006-01-01|        ford|              e250|     NULL|       NULL|

                                                                                

In [34]:
spark.stop()