In [1]:
import psycopg2
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType, TimestampType, ShortType, DateType
from pyspark.sql.functions import col

In [2]:
def initialize_Spark():
    
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("simple etl job") \
        .getOrCreate()

    return spark

In [3]:
def loadDFWithoutSchema(spark):

    df = spark.read.format("csv").option("header", "true").load(os.environ["HOME"] + "/workspace/Spark/Notebooks/data/autos.csv")

    return df

In [4]:
def loadDFWithSchema(spark):

    schema = StructType([
        StructField("dateCrawled", TimestampType(), True),
        StructField("name", StringType(), True),
        StructField("seller", StringType(), True),
        StructField("offerType", StringType(), True),
        StructField("price", LongType(), True),
        StructField("abtest", StringType(), True),
        StructField("vehicleType", StringType(), True),
        StructField("yearOfRegistration", StringType(), True),
        StructField("gearbox", StringType(), True),
        StructField("powerPS", ShortType(), True),
        StructField("model", StringType(), True),
        StructField("kilometer", LongType(), True),
        StructField("monthOfRegistration", StringType(), True),
        StructField("fuelType", StringType(), True),
        StructField("brand", StringType(), True),
        StructField("notRepairedDamage", StringType(), True),
        StructField("dateCreated", DateType(), True),
        StructField("nrOfPictures", ShortType(), True),
        StructField("postalCode", StringType(), True),
        StructField("lastSeen", TimestampType(), True)
    ])

    df = spark \
        .read \
        .format("csv") \
        .schema(schema)         \
        .option("header", "true") \
        .load(os.environ["HOME"] + "/workspace/Spark/Notebooks/data/autos.csv")

    return df

In [5]:
def clean_drop_data(df):

    df_dropped = df.drop("dateCrawled","nrOfPictures","lastSeen")
    df_filtered = df_dropped.where(col("seller") != "gewerblich")
    df_dropped_seller = df_filtered.drop("seller")
    df_filtered2 = df_dropped_seller.where(col("offerType") != "Gesuch")
    df_final = df_filtered2.drop("offerType")

    return df_final

In [6]:
def create_table(cursor):

    cursor.execute("CREATE TABLE IF NOT EXISTS cars_table \
    (   name VARCHAR(255) NOT NULL, \
        price integer NOT NULL, \
        abtest VARCHAR(255) NOT NULL, \
        vehicleType VARCHAR(255), \
        yearOfRegistration VARCHAR(4) NOT NULL, \
        gearbox VARCHAR(255), \
        powerPS integer NOT NULL, \
        model VARCHAR(255), \
        kilometer integer, \
        monthOfRegistration VARCHAR(255) NOT NULL, \
        fuelType VARCHAR(255), \
        brand VARCHAR(255) NOT NULL, \
        notRepairedDamage VARCHAR(255), \
        dateCreated DATE NOT NULL, \
        postalCode VARCHAR(255) NOT NULL);")

In [7]:
def write_postgresql(df):

    cars_seq = [tuple(x) for x in df.collect()]

    records_list_template = ','.join(['%s'] * len(cars_seq))

    insert_query = "INSERT INTO cars_table (name, price, abtest, vehicleType, yearOfRegistration, gearbox, powerPS, \
                        model, kilometer, monthOfRegistration, fuelType, brand, notRepairedDamage, dateCreated, postalCode \
                           ) VALUES {}".format(records_list_template)

    return insert_query, cars_seq

In [8]:
def get_insterted_data(cursor):

    postgreSQL_select_Query = "select brand, model, price from cars_table"

    cursor.execute(postgreSQL_select_Query)

    cars_records = cursor.fetchmany(2)

    print("Printing 2 rows")
    for row in cars_records:
        print("Brand = ", row[0], )
        print("Model = ", row[1])
        print("Price  = ", row[2], "\n")

# Execute

The starting point of every Spark application is the creation of a SparkSession. This is a driver process that maintains all relevant information about your Spark Application and it is also responsible for distributing and scheduling your application across all executors

In [9]:
spark = initialize_Spark()

Read the CSV file

In [10]:
df = loadDFWithSchema(spark)

Clean the dataframe

In [11]:
df_clean = clean_drop_data(df)

Load

In [12]:
import psycopg2

conn = psycopg2.connect(
        host = "localhost",
        port = "5432",
        database = "cars",
        user = "postgres",
        password = "123456789")

A cursor is created out of a connection and it will allow you to communicate with PostgreSQL

In [13]:
# cursor
cur = conn.cursor()

In [14]:
create_table(cur)

In [15]:
conn.commit()

In [16]:
insert_query, cars_seq = write_postgresql(df_clean)

In [17]:
cur.execute(insert_query, cars_seq)

In [18]:
conn.commit()

In [19]:
get_insterted_data(cur)

Printing 2 rows
('Brand = ', 'volkswagen')
('Model = ', 'golf')
('Price  = ', 480, '\n')
('Brand = ', 'audi')
('Model = ', None)
('Price  = ', 18300, '\n')


## Tbalbiz

In [20]:
df.show(1)

+-------------------+----------+------+---------+-----+------+-----------+------------------+-------+-------+-----+---------+-------------------+--------+----------+-----------------+-----------+------------+----------+-------------------+
|        dateCrawled|      name|seller|offerType|price|abtest|vehicleType|yearOfRegistration|gearbox|powerPS|model|kilometer|monthOfRegistration|fuelType|     brand|notRepairedDamage|dateCreated|nrOfPictures|postalCode|           lastSeen|
+-------------------+----------+------+---------+-----+------+-----------+------------------+-------+-------+-----+---------+-------------------+--------+----------+-----------------+-----------+------------+----------+-------------------+
|2016-03-24 11:52:17|Golf_3_1.6|privat|  Angebot|  480|  test|       null|              1993|manuell|      0| golf|   150000|                  0|  benzin|volkswagen|             null| 2016-03-24|           0|     70435|2016-04-07 03:16:57|
+-------------------+----------+------+-

In [21]:
df.where(col("seller")=="gewerblich").show(1)

+-------------------+--------------------+----------+---------+-----+-------+-----------+------------------+-------+-------+------+---------+-------------------+--------+-------+-----------------+-----------+------------+----------+-------------------+
|        dateCrawled|                name|    seller|offerType|price| abtest|vehicleType|yearOfRegistration|gearbox|powerPS| model|kilometer|monthOfRegistration|fuelType|  brand|notRepairedDamage|dateCreated|nrOfPictures|postalCode|           lastSeen|
+-------------------+--------------------+----------+---------+-----+-------+-----------+------------------+-------+-------+------+---------+-------------------+--------+-------+-----------------+-----------+------------+----------+-------------------+
|2016-03-15 18:06:22|Verkaufe_mehrere_...|gewerblich|  Angebot|  100|control|      kombi|              2000|manuell|      0|megane|   150000|                  8|  benzin|renault|             null| 2016-03-15|           0|     65232|2016-04-0

In [22]:
df.select("seller").distinct().show()

+----------+
|    seller|
+----------+
|gewerblich|
|      null|
|    privat|
+----------+

