In [6]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
!curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  6 7125k    6  446k    0     0   688k      0  0:00:10 --:--:--  0:00:10  688k
100 7125k  100 7125k    0     0  5197k      0  0:00:01  0:00:01 --:--:-- 5200k


In [7]:
import sqlite3
import os
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, DateType, StringType

In [8]:
# Create connection to the database file and the cursor to manage it 
con = sqlite3.connect(os.path.join('dev','cademycode.db'))
cur = con.cursor()

In [9]:
# Store the table names from the database
table_names = cur.execute('''SELECT name FROM sqlite_master''').fetchall()
print(table_names)

[('cademycode_students',), ('cademycode_courses',), ('cademycode_student_jobs',)]


In [10]:
# Create spark session
spark = SparkSession.builder \
        .appName('EDA') \
        .master('local[6]') \
        .config(
        "spark.jars",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd())) \
        .config(
        "spark.driver.extraClassPath",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd())) \
        .getOrCreate()

In [11]:
# Create list with column names
student_columns = ['uuid', 'name', 'birthdate', 
                   'sex', 'contact_info', 'job_id', 
                   'courses_count', 'career_path_id', 'hours_spent']

# Create pyspark dataframe object using the queried information
student_df = spark.createDataFrame(cur.execute('''SELECT * FROM cademycode_students''').fetchall(),student_columns)
student_df.show()

# Show the dtypes which were assigned
print(student_df.dtypes)

+----+--------------------+----------+---+--------------------+------+-------------+--------------+-----------+
|uuid|                name| birthdate|sex|        contact_info|job_id|courses_count|career_path_id|hours_spent|
+----+--------------------+----------+---+--------------------+------+-------------+--------------+-----------+
|   1|     Annabelle Avery|1943-07-03|  F|{"mailing_address...|   7.0|          6.0|           1.0|       4.99|
|   2|         Micah Rubio|1991-02-07|  M|{"mailing_address...|   7.0|          5.0|           8.0|        4.4|
|   3|          Hosea Dale|1989-12-07|  M|{"mailing_address...|   7.0|          8.0|           8.0|       6.74|
|   4|        Mariann Kirk|1988-07-31|  F|{"mailing_address...|   6.0|          7.0|           9.0|      12.31|
|   5|     Lucio Alexander|1963-08-31|  M|{"mailing_address...|   7.0|         14.0|           3.0|       5.64|
|   6|    Shavonda Mcmahon|1989-10-15|  F|{"mailing_address...|   6.0|         10.0|           3.0|     

In [12]:
# Create list with column names
course_columns = ['career_path_id', 'career_path_name', 'hours_to_complete']

# Create pyspark dataframe using the data from the query
course_df = spark.createDataFrame(cur.execute('''SELECT * FROM cademycode_courses''').fetchall(),course_columns)
course_df.show()

print(course_df.dtypes)

+--------------+--------------------+-----------------+
|career_path_id|    career_path_name|hours_to_complete|
+--------------+--------------------+-----------------+
|             1|      data scientist|               20|
|             2|       data engineer|               20|
|             3|        data analyst|               12|
|             4|software engineering|               25|
|             5|    backend engineer|               18|
|             6|   frontend engineer|               20|
|             7|       iOS developer|               27|
|             8|   android developer|               27|
|             9|machine learning ...|               35|
|            10|      ux/ui designer|               15|
+--------------+--------------------+-----------------+

[('career_path_id', 'bigint'), ('career_path_name', 'string'), ('hours_to_complete', 'bigint')]


In [13]:
# Create list with column names
job_columns = ['job_id', 'job_category', 'avg_salary']

# Create PySpark dataframe from query
job_df = spark.createDataFrame(cur.execute('''SELECT * FROM cademycode_student_jobs''').fetchall(),job_columns)
job_df.show()

print(job_df.dtypes)

+------+------------------+----------+
|job_id|      job_category|avg_salary|
+------+------------------+----------+
|     1|         analytics|     86000|
|     2|          engineer|    101000|
|     3|software developer|    110000|
|     4|          creative|     66000|
|     5|financial services|    135000|
|     6|         education|     61000|
|     7|                HR|     80000|
|     8|           student|     10000|
|     9|        healthcare|    120000|
|     0|             other|     80000|
|     3|software developer|    110000|
|     4|          creative|     66000|
|     5|financial services|    135000|
+------+------------------+----------+

[('job_id', 'bigint'), ('job_category', 'string'), ('avg_salary', 'bigint')]


In [14]:
cur.close()
con.close()

The following steps will be needed to clean the data
1. Extract the email and address for each student stores inside the _5 column dictionaries
2. Check for null values in the dataframes
3. Change dtype for the student_df dataframe to the following:
    * birthdate which is a date type format to a datetime dtype
    * Columns 'job_id', 'courses_count', 'career_path_id', 'hours_spent' from string to float
4. Remove duplicates from all dataframes

## student_df transformations
This section will explode the json from the contact_info column into two different columns into a new dataframe named studend_address_df.
From it,4 new columns called street, city, state and zipcode will be extracted.

This will also create a new columned named year which extracts the birthdate year into a new column for easier use in further operations

Aditionally, the missing values will be extracted and stored in a separate dataframe for futher analytics.

Once there are no missing values, the dtypes for each column will be adjusted


In [15]:
# Create a new pyspark dataframe which contains the following:
    # uuid to use in the join
    # mailing address which is extracted from the dictionary
    # email address which is extracted from the dictionary
    
student_address_df = student_df.select(
                student_df.uuid.alias('student_uuid'),
                f.get_json_object(student_df.contact_info, '$.mailing_address').alias("mailing_address"),
                f.get_json_object(student_df.contact_info, '$.email').alias("email_address"))
student_address_df.show()

+------------+--------------------+--------------------+
|student_uuid|     mailing_address|       email_address|
+------------+--------------------+--------------------+
|           1|303 N Timber Key,...|annabelle_avery93...|
|           2|767 Crescent Fair...| rubio6772@hmail.com|
|           3|P.O. Box 41269, S...|hosea_dale8084@co...|
|           4|517 SE Wintergree...|  kirk4005@hmail.com|
|           5|18 Cinder Cliff, ...|alexander9810@hma...|
|           6|P.O. Box 81591, T...|shavonda5863@cold...|
|           7|P.O. Box 53471, O...|bleijenberg188@hm...|
|           8|255 Spring Avenue...|stanford_allan805...|
|           9|997 Dewy Apple, L...|tricia_delacruz66...|
|          10|220 Middle Ridge,...|regenia6908@inloo...|
|          11|818 Clear Street,...|shonda_stephanin4...|
|          12|718 Embers Lane, ...|mcfarland1396@woo...|
|          13|147 SW Plain, Sol...|edwardo8281@inloo...|
|          14|P.O. Box 73926, M...|robena_padilla147...|
|          15|868 Hazy Crossing

In [16]:
# Split the address string into separate parts
student_address_df = student_address_df.withColumn("split_col", f.split(student_address_df["mailing_address"], ","))

# Combine the first three elements of the split_col list into the "street" column
student_address_df = student_address_df.withColumn("street", f.array_join(f.slice(student_address_df["split_col"],1,1),' '))

# Extract the city, state, and zipcode from the split_col list and create separate columns
student_address_df = student_address_df.withColumn("city", f.array_join(f.slice(student_address_df["split_col"],2,1),' '))
student_address_df = student_address_df.withColumn("state", f.array_join(f.slice(student_address_df["split_col"],3,1),' '))
student_address_df = student_address_df.withColumn("zipcode", f.array_join(f.slice(student_address_df["split_col"],4,1),' '))

# Drop the split_col column
student_address_df = student_address_df.drop("split_col")

# Drop the mailing address column
student_address_df = student_address_df.drop("mailing_address")

In [17]:
# Show the resulting dataframe with the student contact information
student_address_df.show()

+------------+--------------------+--------------------+----------------+---------------+-------+
|student_uuid|       email_address|              street|            city|          state|zipcode|
+------------+--------------------+--------------------+----------------+---------------+-------+
|           1|annabelle_avery93...|    303 N Timber Key|        Irondale|      Wisconsin|  84736|
|           2| rubio6772@hmail.com|   767 Crescent Fair|          Shoals|        Indiana|  37439|
|           3|hosea_dale8084@co...|      P.O. Box 41269| St. Bonaventure|       Virginia|  83637|
|           4|  kirk4005@hmail.com|517 SE Wintergree...|            Lane|       Arkansas|  82242|
|           5|alexander9810@hma...|     18 Cinder Cliff|  Doyles borough|   Rhode Island|  73737|
|           6|shavonda5863@cold...|      P.O. Box 81591|  Tarpon Springs|        Montana|  37057|
|           7|bleijenberg188@hm...|      P.O. Box 53471|       Oskaloosa|       Virginia|  85274|
|           8|stanfo

In [18]:
# Join the contact information dataframe to the original dataframe and drop the join column to avoid duplicity
student_df = student_df.join(student_address_df, student_df.uuid == student_address_df.student_uuid).drop('student_uuid')
student_df = student_df.drop('contact_info')
student_df.show()

+----+--------------------+----------+---+------+-------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+-------+
|uuid|                name| birthdate|sex|job_id|courses_count|career_path_id|hours_spent|       email_address|              street|                city|          state|zipcode|
+----+--------------------+----------+---+------+-------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+-------+
|  26|       Doug Browning|1970-06-08|  M|   7.0|         null|           5.0|       1.92| doug7761@inlook.com|      P.O. Box 15845|              Devine|        Florida|  23097|
|  29|      Edgardo Chavez|1946-02-12|  M|   7.0|         12.0|           5.0|      12.98|edgardo9341@wooho...|758 Green Butterf...|     Crescentvillage|          Maine|  81750|
| 474|       Minerva Solis|1982-09-05|  F|   2.0|         15.0|           6.0|       3.55|minerva_solis961@...

In [19]:
# Extract the year from the birthdate and store it as its own value to make it easier to access it for further analytics

student_df = student_df.withColumn('birth_year', f.array_join(f.slice(f.split(student_df.birthdate, '-'),1,1),''))
student_df.show()

+----+--------------------+----------+---+------+-------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+-------+----------+
|uuid|                name| birthdate|sex|job_id|courses_count|career_path_id|hours_spent|       email_address|              street|                city|          state|zipcode|birth_year|
+----+--------------------+----------+---+------+-------------+--------------+-----------+--------------------+--------------------+--------------------+---------------+-------+----------+
|  26|       Doug Browning|1970-06-08|  M|   7.0|         null|           5.0|       1.92| doug7761@inlook.com|      P.O. Box 15845|              Devine|        Florida|  23097|      1970|
|  29|      Edgardo Chavez|1946-02-12|  M|   7.0|         12.0|           5.0|      12.98|edgardo9341@wooho...|758 Green Butterf...|     Crescentvillage|          Maine|  81750|      1946|
| 474|       Minerva Solis|1982-09-05|  F|   2.0|      

In [20]:
# Count the null/nan/missing values in each column
student_df.select([f.count(f.when(f.col(col).isNull() | f.isnan(col),col)).alias(f'{col}_Missing_Count') for col in student_df.columns]).show()

+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|uuid_Missing_Count|name_Missing_Count|birthdate_Missing_Count|sex_Missing_Count|job_id_Missing_Count|courses_count_Missing_Count|career_path_id_Missing_Count|hours_spent_Missing_Count|email_address_Missing_Count|street_Missing_Count|city_Missing_Count|state_Missing_Count|zipcode_Missing_Count|birth_year_Missing_Count|
+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|                 0|                 

In [21]:
# Create a new dataframe with the missing information instead of deleting it so it can be used to look into the reasons for missing data
student_df_missing_info = student_df.exceptAll(student_df.dropna())
student_df_missing_info.select([f.count(f.when(f.col(col).isNull() | f.isnan(col),col)).alias(f'{col}_Missing_Count') for col in student_df_missing_info.columns]).show()

+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|uuid_Missing_Count|name_Missing_Count|birthdate_Missing_Count|sex_Missing_Count|job_id_Missing_Count|courses_count_Missing_Count|career_path_id_Missing_Count|hours_spent_Missing_Count|email_address_Missing_Count|street_Missing_Count|city_Missing_Count|state_Missing_Count|zipcode_Missing_Count|birth_year_Missing_Count|
+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|                 0|                 

In [22]:
# Drop the null/nan/missing values from the dataframe now that they have been stored in another
student_df = student_df.dropna()
student_df.select([f.count(f.when(f.col(col).isNull() | f.isnan(col),col)).alias(f'{col}_Missing_Count') for col in student_df.columns]).show()

+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|uuid_Missing_Count|name_Missing_Count|birthdate_Missing_Count|sex_Missing_Count|job_id_Missing_Count|courses_count_Missing_Count|career_path_id_Missing_Count|hours_spent_Missing_Count|email_address_Missing_Count|street_Missing_Count|city_Missing_Count|state_Missing_Count|zipcode_Missing_Count|birth_year_Missing_Count|
+------------------+------------------+-----------------------+-----------------+--------------------+---------------------------+----------------------------+-------------------------+---------------------------+--------------------+------------------+-------------------+---------------------+------------------------+
|                 0|                 

In [23]:
# Sort the student_df columns and cast them to the right dtype
sorted_columns = ['uuid', 'name', 'birthdate', 
                  'birth_year', 'sex', 'email_address',
                  'street', 'city', 'state', 'zipcode',
                  'job_id', 'courses_count', 'career_path_id',
                  'hours_spent']
column_dtypes = [IntegerType(), StringType(), DateType(),
                 IntegerType(), StringType(), StringType(),
                 StringType(), StringType(), StringType(), IntegerType(),
                 IntegerType(), IntegerType(), IntegerType(),
                 FloatType()]
for key,value in dict(zip(sorted_columns,column_dtypes)).items():
    student_df = student_df.withColumn(key, student_df[key].cast(value))
student_df = student_df.select(*sorted_columns)
student_df.show()
student_df.dtypes

+----+--------------------+----------+----------+---+--------------------+--------------------+--------------------+---------------+-------+------+-------------+--------------+-----------+
|uuid|                name| birthdate|birth_year|sex|       email_address|              street|                city|          state|zipcode|job_id|courses_count|career_path_id|hours_spent|
+----+--------------------+----------+----------+---+--------------------+--------------------+--------------------+---------------+-------+------+-------------+--------------+-----------+
|  29|      Edgardo Chavez|1946-02-12|      1946|  M|edgardo9341@wooho...|758 Green Butterf...|     Crescentvillage|          Maine|  81750|     7|           12|             5|      12.98|
| 474|       Minerva Solis|1982-09-05|      1982|  F|minerva_solis961@...|     782 Common Edge|       Martins Ferry|  West Virginia|  13878|     2|           15|             6|       3.55|
| 191|         Arlen Downs|1945-10-07|      1945|  N|ar

[('uuid', 'int'),
 ('name', 'string'),
 ('birthdate', 'date'),
 ('birth_year', 'int'),
 ('sex', 'string'),
 ('email_address', 'string'),
 ('street', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('zipcode', 'int'),
 ('job_id', 'int'),
 ('courses_count', 'int'),
 ('career_path_id', 'int'),
 ('hours_spent', 'float')]

## course_df transformation
The student_df career_path_id values will be inspected to see if these match with those in the course_df. 

Since course_df is a small dataframe with few dimensions, if this check is passed then there will be no need to transform it.

In [24]:
student_df.groupBy('career_path_id').count().orderBy('career_path_id').show()

+--------------+-----+
|career_path_id|count|
+--------------+-----+
|             1|  437|
|             2|  428|
|             3|  442|
|             4|  401|
|             5|  446|
|             6|  432|
|             7|  433|
|             8|  420|
|             9|  417|
|            10|  437|
+--------------+-----+



In [25]:
course_df.show()

+--------------+--------------------+-----------------+
|career_path_id|    career_path_name|hours_to_complete|
+--------------+--------------------+-----------------+
|             1|      data scientist|               20|
|             2|       data engineer|               20|
|             3|        data analyst|               12|
|             4|software engineering|               25|
|             5|    backend engineer|               18|
|             6|   frontend engineer|               20|
|             7|       iOS developer|               27|
|             8|   android developer|               27|
|             9|machine learning ...|               35|
|            10|      ux/ui designer|               15|
+--------------+--------------------+-----------------+



## job_df

Like the course_df, the jobs_df has few data entries. 
During the initial exploration there were some duplicate entries therefore they'll be removed

In [26]:
job_df.groupby('job_id').count().show()

+------+-----+
|job_id|count|
+------+-----+
|     1|    1|
|     2|    1|
|     3|    2|
|     4|    2|
|     6|    1|
|     5|    2|
|     7|    1|
|     8|    1|
|     0|    1|
|     9|    1|
+------+-----+



In [27]:
job_df = job_df.dropDuplicates()
job_df.show()
job_df.dtypes

+------+------------------+----------+
|job_id|      job_category|avg_salary|
+------+------------------+----------+
|     1|         analytics|     86000|
|     2|          engineer|    101000|
|     3|software developer|    110000|
|     4|          creative|     66000|
|     5|financial services|    135000|
|     6|         education|     61000|
|     7|                HR|     80000|
|     8|           student|     10000|
|     9|        healthcare|    120000|
|     0|             other|     80000|
+------+------------------+----------+



[('job_id', 'bigint'), ('job_category', 'string'), ('avg_salary', 'bigint')]

## Create the tables that will be loaded to the new db
The student df columns are the following: 
['uuid', 'name', 'birthdate', 
'birth_year', 'sex', 'email_address',
'street', 'city', 'state', 'zipcode',
'job_id', 'courses_count', 'career_path_id',
'hours_spent']

In order to create a more managable and faster resulting dataframe, they will be split in the following way:

### student_information
1. uuid
2. name
3. job_id
4. career_path_id

### student_details
1. uuid
2. birthdate
3. birth_year
4. sex

### student_studies
1. uuid
2. courses_count
3. hours_spent

### student_contact
1. uuid
2. email_address
3. street
4. city
5. state
6. zipcode

Along the course_df and job_df. The missing_information dataframe will be fully joined and uploaded in a separate table.

In [28]:
# Create the tables in the new database for each dataframe
con = sqlite3.connect(os.path.join('dev','cademycode_updated.db'))
cur = con.cursor()

# Student information
cur.execute('''CREATE TABLE IF NOT EXISTS student_information (
                uuid  INTEGER,
                name TEXT,
                job_id INTEGER,
                career_path_id INTEGER,
                timestamp TEXT)''')
# Student details
cur.execute('''CREATE TABLE IF NOT EXISTS student_details (
                uuid INTEGER,
                birthdate TEXT,
                birth_year INTEGER,
                sex VARCHAR(1))''')
# Student studies
cur.execute('''CREATE TABLE IF NOT EXISTS student_studies (
                uuid INTEGER,
                courses_count INTEGER,
                hours_spent REAL)''')
# Student contact
cur.execute('''CREATE TABLE IF NOT EXISTS student_contact(
                uuid INTEGER,
                email_address TEXT,
                street TEXT,
                city TEXT,
                state TEXT,
                zipcode INTEGER)''')
# Course information
cur.execute('''CREATE TABLE IF NOT EXISTS course_info (
                career_path_id INTEGER,
                career_path_name TEXT,
                hours_to_complete INTEGER)''')
# Job information
cur.execute('''CREATE TABLE IF NOT EXISTS job_info (
                job_id INTEGER,
                job_category TEXT,
                avg_salary INTEGER)''')
# Missing information 
cur.execute('''CREATE TABLE IF NOT EXISTS missing_data_entries (
                uuid INTEGER,
                name TEXT,
                birthdate TEXT,
                birth_year INTEGER,
                sex VARCHAR(1),
                email_address TEXT,
                street TEXT,
                city TEXT,
                state TEXT,
                zipcode INTEGER,
                job_id INTEGER,
                courses_count INTEGER,
                career_path_id INTEGER,
                hours_spent REAL,
                timestamp TEXT)''')
# Confirm tables were created
print(cur.execute('''SELECT name FROM sqlite_master''').fetchall())
# Commit changes
con.commit()
# Close connection
cur.close()
con.close()

[('student_information',), ('student_details',), ('student_studies',), ('student_contact',), ('job_info',), ('missing_data_entries',), ('course_info',)]


In [29]:
mode = 'overwrite'
jdbc_url = 'jdbc:sqlite:dev/cademycode_updated.db'

In [30]:
student_df = student_df.withColumn('loaded_on_date',f.current_timestamp())

student_df.select(*['uuid', 'name', 'job_id', 'career_path_id', 'loaded_on_date']) \
                    .write.jdbc(url=jdbc_url, mode=mode, table='student_information')

In [31]:
student_df.select(*['uuid','birthdate','birth_year','sex'])\
                .write.jdbc(url=jdbc_url, mode=mode, table='student_details')

In [32]:
student_df.select(*['uuid','courses_count','hours_spent'])\
                .write.jdbc(url=jdbc_url, mode=mode, table='student_studies')

In [33]:
student_df.select(*['uuid','email_address','street','city','state','zipcode'])\
                .write.jdbc(url=jdbc_url, mode=mode, table='student_contact')

In [61]:
course_df.select(*course_columns)\
            .write.format('jdbc').option('url',jdbc_url).option('mode','append').option('dbtable','course_info')

<pyspark.sql.readwriter.DataFrameWriter at 0x1fc608b28c8>

In [None]:
job_df.select(*job_columns)\
                .write.jdbc(url=jdbc_url, mode=mode, table='job_info')

In [None]:
student_df_missing_info.select(*sorted_columns)\
                    .withColumn('loaded_on_date',f.current_timestamp()) \
                    .write.jdbc(url=jdbc_url, mode=mode, table='missing_data_entries')