In [29]:
import pyspark

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * 

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("simple etl job") \
        .getOrCreate()

In [31]:
spark

In [32]:
pathfile = "/home/jovyan/spark/autos.csv"

### **Connection to PostgreSQL**

In [33]:
import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()  # Required to load the previously defined environment variables

# Create connection to postgres
connection = psycopg2.connect(host=os.environ.get('PG_HOST'),
                        port=os.environ.get('PG_PORT'),
                        user=os.environ.get('PG_USER'),
                        password=os.environ.get('PG_PASSWORD'),
                        dbname=os.environ.get('PG_DATABASE'),
                        sslmode='require')
connection.autocommit = True  # Ensure data is added to the database immediately after write commands
cursor = connection.cursor()
cursor.execute('SELECT %s as connected;', ('Connection to postgres successful!',))
print(cursor.fetchone())

('Connection to postgres successful!',)


In [43]:
schema = StructType([
        StructField("dateCrawled", TimestampType(), True),
        StructField("name", StringType(), True),
        StructField("seller", StringType(), True),
        StructField("offerType", StringType(), True),
        StructField("price", LongType(), True),
        StructField("abtest", StringType(), True),
        StructField("vehicleType", StringType(), True),
        StructField("yearOfRegistration", StringType(), True),
        StructField("gearbox", StringType(), True),
        StructField("powerPS", ShortType(), True),
        StructField("model", StringType(), True),
        StructField("kilometer", LongType(), True),
        StructField("monthOfRegistration", StringType(), True),
        StructField("fuelType", StringType(), True),
        StructField("brand", StringType(), True),
        StructField("notRepairedDamage", StringType(), True),
        StructField("dateCreated", DateType(), True),
        StructField("nrOfPictures", ShortType(), True),
        StructField("postalCode", StringType(), True),
        StructField("lastSeen", TimestampType(), True),
        StructField("yearOfCreation", TimestampType(), True),
        StructField("yearCrawled", TimestampType(), True),
        StructField("monthOfCreation", TimestampType(), True),
        StructField("monthCrawled", TimestampType(), True),
        StructField("NoOfDaysOnline", IntegerType(), True),
        StructField("NoOfHrsOnline", TimestampType(), True),
        StructField("yearsOld", IntegerType(), True),
        StructField("monthsOld", TimestampType(), True)
    ])

### **Extract**

In [44]:
#we load the dataset
sdf = spark.read.format("csv").schema(schema).option("header", "true").load(pathfile)

### **Transform**

In [45]:
#clean dataset
sdf_filtered = sdf.drop("dateCrawled","nrOfPictures","lastSeen","seller", "offerType","yearOfCreation",\
                        "yearCrawled", "monthOfCreation","NoOfDaysOnline","NoOfHrsOnline","yearsOld","monthsOld","monthCrawled")

In [46]:
sdf_filtered = sdf_filtered.na.drop("any")

In [47]:
sdf_filtered.printSchema()

root
 |-- name: string (nullable = true)
 |-- price: long (nullable = true)
 |-- abtest: string (nullable = true)
 |-- vehicleType: string (nullable = true)
 |-- yearOfRegistration: string (nullable = true)
 |-- gearbox: string (nullable = true)
 |-- powerPS: short (nullable = true)
 |-- model: string (nullable = true)
 |-- kilometer: long (nullable = true)
 |-- monthOfRegistration: string (nullable = true)
 |-- fuelType: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- notRepairedDamage: string (nullable = true)
 |-- dateCreated: date (nullable = true)
 |-- postalCode: string (nullable = true)



In [24]:
createTableQuery = """
    CREATE TABLE IF NOT EXISTS cars_table 
    (   name VARCHAR(255) NOT NULL, 
        price integer NOT NULL,
        abtest VARCHAR(255) NOT NULL, 
        vehicleType VARCHAR(255), 
        yearOfRegistration VARCHAR(4) NOT NULL, 
        gearbox VARCHAR(255), 
        powerPS integer NOT NULL,
        model VARCHAR(255), 
        kilometer integer, 
        monthOfRegistration VARCHAR(255) NOT NULL,
        fuelType VARCHAR(255), 
        brand VARCHAR(255) NOT NULL, 
        notRepairedDamage VARCHAR(255), 
        dateCreated DATE NOT NULL, 
        postalCode VARCHAR(255) NOT NULL);
  """

In [25]:
cursor.execute(createTableQuery)

In [48]:
#read data
readDataQuery = 'SELECT * FROM cars_table;'
cursor.execute(readDataQuery)
cursor.fetchall()

[]

In [40]:
#After creating the table, it’s now ready to be populated with our dataset
#we load our dataframe into csv file
# Other CSV options
sdf_filtered.write.options(header='True', delimiter=',') \
 .csv("/home/jovyan/spark/autos_filtered.csv")

In [41]:
pathfile_filtered = "/home/jovyan/spark/autos_filtered_1.csv"

### **Load**

In [50]:
#load data into postgresql
import csv
cursor = connection.cursor()
with open(pathfile_filtered) as csvFile:
   next(csvFile) # SKIP HEADERS
   cursor.copy_from(csvFile, "cars_table", sep=",")
connection.commit()
connection.close()
