In [1]:
#importing modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sha2

In [2]:
#creating a spark session
#postgres driver
postgres_driver_path = "/home/jovyan/work/postgresql-42.2.27.jre7.jar"
spark = SparkSession.builder.appName("pipeline").config("spark.jars", postgres_driver_path).config("spark.driver.extraClassPath", postgres_driver_path).config("spark.executor.extraClassPath", postgres_driver_path).getOrCreate()

In [3]:
#reading data
file_path = "./users.csv"
df = spark.read.csv(file_path,header=True)

In [7]:
#typecasting the id column from string to int
try:
    df = df.withColumn("id",col("id").cast("int"))
except:
    print("error converting id's to int")

In [5]:
#adding a new column with hashed email address
df = df.withColumn("hashed_email",sha2(col("email"), 256))

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- hashed_email: string (nullable = true)



In [7]:

#writing to a csv file
df.write.csv("results.csv")

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/jovyan/work/results.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [8]:
#setting up postgres
#instead of localhost we need to use the name of the container as localhost would create confusion during connection
postgres_url = "jdbc:postgresql://postgres:5432/etl_pipeline"
table_name = "users"
postgres_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [9]:
#writing to postgres
df.write \
    .format("jdbc") \
    .option("url", postgres_url) \
    .option("dbtable", table_name) \
    .option("user", postgres_properties["user"]) \
    .option("password", postgres_properties["password"]) \
    .option("driver", postgres_properties["driver"]) \
    .mode("overwrite") \
    .save()

In [10]:
#stop the session
spark.stop()