## ETL using SPARK for nested json 

### import required libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pandas as pd

### create spark session

In [2]:
spark = SparkSession.builder.appName('nestedJSON').getOrCreate()

In [7]:
df = spark.read.json('employee_data.json')

In [4]:
df.printSchema()

root
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- candidate: struct (nullable = true)
 |    |    |    |-- experience: string (nullable = true)
 |    |    |    |-- first_name: string (nullable = true)
 |    |    |    |-- last_name: string (nullable = true)
 |    |    |    |-- relocation: string (nullable = true)
 |    |    |    |-- skills: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- specialty: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)



### explode features column and deal with candidate column

In [11]:
df_flat = df.select(explode('features').alias('Elements'))
df_flat = df_flat.select('Elements.candidate')

df_flat.show()

+--------------------+
|           candidate|
+--------------------+
|{Mid, Margaret, M...|
|{Senior, Michael,...|
|{Mid, Brenda, Tyl...|
|{Senior, Joseph, ...|
|{Junior, Laura, W...|
|{Mid, Cheryl, Ram...|
|{Mid, Charles, St...|
|{Senior, Bradley,...|
|{Mid, William, Li...|
|{Senior, Richard,...|
|{Junior, Robert, ...|
|{Mid, Tanya, Schu...|
|{Senior, Scott, N...|
|{Junior, Brett, H...|
|{Junior, Sean, Wi...|
|{Senior, Steven, ...|
|{Mid, Mr., James,...|
|{Junior, Linda, G...|
|{Junior, Jacob, E...|
|{Mid, Nicole, Mar...|
+--------------------+
only showing top 20 rows



### the current format isn't quite what we need

In [12]:
df_flat.printSchema()

root
 |-- candidate: struct (nullable = true)
 |    |-- experience: string (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- relocation: string (nullable = true)
 |    |-- skills: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- specialty: string (nullable = true)
 |    |-- state: string (nullable = true)



In [13]:
df_flat1=df.select(explode('features.candidate').alias('candi'))
df_flat1 = df_flat1.select('candi.first_name','candi.last_name','candi.experience','candi.relocation','candi.specialty','candi.skills','candi.state')
df_flat1.show()

+----------+---------+----------+----------+------------------+--------------------+-----+
|first_name|last_name|experience|relocation|         specialty|              skills|state|
+----------+---------+----------+----------+------------------+--------------------+-----+
|  Margaret| Mcdonald|       Mid|        no|          Database|[skLearn, Java, R...|   AL|
|   Michael|   Carter|    Senior|       yes|        Statistics|[TensorFlow, R, S...|   AR|
|    Brenda|    Tyler|       Mid|        no|          Database|             [Spark]|   UT|
|    Joseph|     King|    Senior|     maybe|  Machine Learning|[skLearn, SQL, R,...|   FL|
|     Laura|     Webb|    Junior|     maybe|  Machine Learning|[TensorFlow, C++,...|   WY|
|    Cheryl|  Ramirez|       Mid|        no|Data Visualization|[C++, Python, R, ...|   OK|
|   Charles|  Stewart|       Mid|     maybe|  Machine Learning|[MongoDB, C++, Ja...|   NM|
|   Bradley| Peterson|    Senior|       yes|Data Visualization|[skLearn, MongoDB...|   TX|

### we have reached to the final format

In [14]:
df_flat1.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- experience: string (nullable = true)
 |-- relocation: string (nullable = true)
 |-- specialty: string (nullable = true)
 |-- skills: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)



### downlaod dataset to be a csv file

In [8]:
import os

# Unset HADOOP_HOME environment variable
os.environ.pop('HADOOP_HOME', None)

# Save DataFrame as CSV using Pandas
df_flat1.toPandas().to_csv('employee_data.csv', index=False)


## upload data into postgreSQL

In [15]:
import psycopg2

In [None]:
def connect():
    try:
        conn = psycopg2.connect(
            dbname="postgres",
            user="postgres",
            password="root",
            host="127.0.0.1",
            port="5432"
        )
        return conn
    except psycopg2.Error as e:
        print("Unable to connect to the database.")
        print(e)
        return None

### Function to create table

In [None]:
def create_table(conn):
    try:
        cur = conn.cursor()
        cur.execute("""
            CREATE TABLE IF NOT EXISTS employee_data (
                first_name TEXT,
                last_name TEXT,
                skills TEXT[],
                state TEXT,
                specialty TEXT,
                experience TEXT,
                relocation TEXT
            );
        """)
        conn.commit()
        cur.close()
    except psycopg2.Error as e:
        print("Error creating table.")
        print(e)

### Function to insert data

In [None]:
def insert_data(conn, data):
    try:
        cur = conn.cursor()
        # Format skills as an array
        skills_arr = "{" + ", ".join(["'{}'".format(skill) for skill in data[2]]) + "}"
        # Insert data into the table
        cur.execute("""
            INSERT INTO employee_data (
                first_name,
                last_name,
                skills,
                state,
                specialty,
                experience,
                relocation
            ) VALUES (%s, %s, %s, %s, %s, %s, %s);
        """, (data[0], data[1], skills_arr, data[3], data[4], data[5], data[6]))
        conn.commit()
        cur.close()
    except psycopg2.Error as e:
        print("Error inserting data.")
        print(e)


### main function

In [None]:
def main():
    conn = connect()
    if conn is None:
        return
    create_table(conn)
    for index, row in df.iterrows():
        data = (row['first_name'], row['Last_Name'], row['skills'], row['state'], row['specialty'], row['experience'], row['relocation'])
        insert_data(conn, data)
    conn.close()

if __name__ == "__main__":
    main()