# Adventure Works Database
**TODO:**
 * Make connection from pyspark to local SQL server to retreive data - Adventureworks 2017


## Data Diagram
![DIAGRAM](adventureworks_person.PNG)

Source: https://dataedo.com/download/AdventureWorks.pdf

## Data Retreival ( Extract )

Data is stored in local SQL. I will be retreiving the data through making JDBC connection. In the process, appropriate user IDs & PWs used to gain access to make connection.

 * Make connection
 * Retreive data
 * Present data

***Other notes***
 * Make sure to check your Java version to use appropriate MySQL JDBC jar file

In [1]:
import pyodbc 


import pandas as pd
from tqdm import tqdm

In [3]:
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,length,trim
from pyspark import SparkContext, SparkConf, SQLContext

In [4]:
import os
def find():
    spark_home = os.environ.get('SPARK_HOME', None)
    print(spark_home)

    if not spark_home:
        for path in [
            '/usr/local/opt/apache-spark/libexec', # OS X Homebrew
            '/usr/lib/spark/' # AWS Amazon EMR
            # Any other common places to look?
        ]:
            if os.path.exists(path):
                spark_home = path
                break

    if not spark_home:
        raise ValueError("Couldn't find Spark, make sure SPARK_HOME env is set"
                         " or Spark is in an expected location (e.g. from homebrew installation).")

    return spark_home

In [5]:
print( find() )

C:\spark
C:\spark


In [6]:
appName = "PySpark SQL Server - JDBC"
master = "local"
conf = SparkConf() \
    .setAppName( appName ) \
    .setMaster( master ) \
    .set( "spark.driver.extraClassPath", "mssql-jdbc-8.2.2.jre8.jar" )


In [7]:
PYSPARK_ALLOW_INSECURE_GATEWAY=1

In [8]:
sc = SparkContext( conf = conf )
sqlContext = SQLContext( sc )




In [9]:
spark = sqlContext.sparkSession

In [10]:
import findspark
findspark.add_packages( 'mysql:mysql-connector-java:8.2.2' )

In [11]:
# C:\Users\hkpar
filename = "C:/Users/hkpar/authentication/mysql.txt"
with open( filename ) as f:
    lines = f.read().splitlines() 

if( len( lines ) == 2 ):
    user = lines[0]
    password = lines[1]
else:
    print( 'USERNAME and PASSWORD CORRUPTION ERROR' )
    raise ValueError( 'USERNAME and PASSWORD CORRUPTION ERROR' )
# f.close()

In [12]:
database = "AdventureWorks2017"

In [13]:
"""
Function: getTable()

Description:
    Returns the dataframe of the table name

Parametres: table (String)
Return: DataFrame (Spark)
"""

def getTable( table ):
    df_table = spark.read.format( "jdbc" ) \
        .option( "url", f"jdbc:sqlserver://localhost:1433;databaseName={database}" ) \
        .option( "dbtable", table ) \
        .option( "user", user ) \
        .option( "password", password ) \
        .option( "driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver" ) \
        .load()

    return df_table

In [14]:
df_p = getTable( "Person.Person" )
df_pPhone = getTable( "Person.PersonPhone" )
df_pEmail = getTable( "Person.EmailAddress" )

In [15]:
print( df_p.show() )
print( df_pPhone.show() )
print( df_pEmail.show() )

+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|  LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|       ModifiedDate|
+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|               1|        EM|    false| null|      Ken|         J|   Sánchez|  null|             0|                 null|<IndividualSurvey...|92C4279F-1207-48A...|2009-01-07 00:00:00|
|               2|        EM|    false| null|    Terri|       Lee|     Duffy|  null|             1|                 null|<IndividualSurvey...|D8763459-8AA8-47C...|2008-01-24 00:00:00|
|               3|        EM|    false| null|  Roberto|      null|Tamburello|  n

## Pre Processing ( Transform )

It is important to check the data before we give accessabilities. Sometimes data can be corrupted, repeated, not needed, contains less information, etc. Thus, we need to ensure the data goes through several filters to make sure data is safe and been pre processed.

 * Check for nulls
 * Check for repetitions
 * Check for consistency

In [16]:
# Show __Person__ table with column title without NULLS
df_p.filter( df_p.Title.isNotNull()).show()

+----------------+----------+---------+-----+---------+----------+-----------+------+--------------+---------------------+--------------------+--------------------+--------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|   LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|        ModifiedDate|
+----------------+----------+---------+-----+---------+----------+-----------+------+--------------+---------------------+--------------------+--------------------+--------------------+
|               5|        EM|    false|  Ms.|     Gail|         A|   Erickson|  null|             0|                 null|<IndividualSurvey...|F3A3F6B4-AE3B-430...| 2007-12-30 00:00:00|
|               6|        EM|    false|  Mr.|   Jossef|         H|   Goldberg|  null|             0|                 null|<IndividualSurvey...|0DEA28FD-EFFE-482...| 2013-12-16 00:00:00|
|              13|        EM|    false|  Ms.|   Janice|         M|    

In [17]:
# Show __Person__ table with column title with NULLS

df_p.filter( df_p.Title.isNull() ).show()

+----------------+----------+---------+-----+---------+----------+-----------------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|         LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|       ModifiedDate|
+----------------+----------+---------+-----+---------+----------+-----------------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|               1|        EM|    false| null|      Ken|         J|          Sánchez|  null|             0|                 null|<IndividualSurvey...|92C4279F-1207-48A...|2009-01-07 00:00:00|
|               2|        EM|    false| null|    Terri|       Lee|            Duffy|  null|             1|                 null|<IndividualSurvey...|D8763459-8AA8-47C...|2008-01-24 00:00:00|
|               3|        EM|    false| null|

In [18]:
# Show __Phone__ table with more than 10 digit numbers
df_pPhone.filter( length(col( "PhoneNumber" )) > 12).show()

+----------------+-------------------+-----------------+--------------------+
|BusinessEntityID|        PhoneNumber|PhoneNumberTypeID|        ModifiedDate|
+----------------+-------------------+-----------------+--------------------+
|             286|1 (11) 500 555-0190|                1| 2013-05-23 00:00:00|
|             288|1 (11) 500 555-0140|                1| 2013-05-23 00:00:00|
|             289|1 (11) 500 555-0145|                3| 2012-05-23 00:00:00|
|             290|1 (11) 500 555-0117|                1| 2012-05-23 00:00:00|
|             299|1 (11) 500 555-0132|                1|2015-04-15 16:33:...|
|             313|1 (11) 500 555-0198|                1| 2013-06-30 00:00:00|
|             321|1 (11) 500 555-0150|                1| 2012-07-31 00:00:00|
|             349|1 (11) 500 555-0190|                1| 2013-07-31 00:00:00|
|             357|1 (11) 500 555-0181|                1| 2013-06-30 00:00:00|
|             377|1 (11) 500 555-0139|                3| 2013-06

In [19]:
# Get Max value of a column
df_pPhone.agg({'PhoneNumberTypeID': 'max'}).show()

+----------------------+
|max(PhoneNumberTypeID)|
+----------------------+
|                     3|
+----------------------+



In [20]:
# Get Min value of a column
df_pPhone.agg({'PhoneNumberTypeID': 'min'}).show()

+----------------------+
|min(PhoneNumberTypeID)|
+----------------------+
|                     1|
+----------------------+



In [21]:
# To show, the minimum value is correctly displaying -- that there are no values less than zero.
df_pPhone.filter( col( "PhoneNumberTypeID" ) < 0).show()

+----------------+-----------+-----------------+------------+
|BusinessEntityID|PhoneNumber|PhoneNumberTypeID|ModifiedDate|
+----------------+-----------+-----------------+------------+
+----------------+-----------+-----------------+------------+



## Joins

Here, I have used the Inner join to join two tables, Person.Phone and Person.Email.

##### Potential Problem:
 * Duplicated Columns  - in this example, Modified Date has been duplicated
 * NULL values - since this is inner joins, if two tables are already NULL checked, then there should not be any NULLs.

## Solution:
(1) Check if the two tables have equal columns before joins
(2) Drop one of the columns

In [22]:
sameCols = set( df_pPhone.columns ).intersection( set( df_pEmail.columns ))
sameCols

{'BusinessEntityID', 'ModifiedDate'}

In [23]:
# Inner Join
df_pPhoneEmail = df_pPhone.join( df_pEmail, ( df_pPhone.BusinessEntityID == df_pEmail.BusinessEntityID ) & ( df_pPhone.ModifiedDate == df_pEmail.ModifiedDate ), "inner" ) \
.drop( df_pEmail.BusinessEntityID ) \
.drop( df_pEmail.ModifiedDate )

df_pPhoneEmail.show( truncate = False )

+----------------+-------------------+-----------------+-------------------+--------------+------------------------------+------------------------------------+
|BusinessEntityID|PhoneNumber        |PhoneNumberTypeID|ModifiedDate       |EmailAddressID|EmailAddress                  |rowguid                             |
+----------------+-------------------+-----------------+-------------------+--------------+------------------------------+------------------------------------+
|196             |793-555-0179       |1                |2008-12-17 00:00:00|196           |shammi0@adventure-works.com   |F62FC889-EADF-48C4-B4C2-3DEFF636A677|
|274             |238-555-0197       |1                |2010-12-28 00:00:00|274           |stephen0@adventure-works.com  |FAFCED5B-FF48-4397-855A-ADEE1A5136DE|
|1746            |484-555-0100       |1                |2013-03-27 00:00:00|1042          |alan3@adventure-works.com     |E06ACF96-2D52-4476-93BE-3450605D1CD7|
|1999            |874-555-0100       |1 

#### Convert to Parquet
 * Convert the given Spark dataframe into parquet

In [24]:
df_pPhoneEmail.write.parquet( "peoplePhoneEmail.parquet" )
parquetDF = spark.read.parquet( "peoplePhoneEmail.parquet" )

In [26]:
parquetDF.show()

+----------------+-------------------+-----------------+-------------------+--------------+--------------------+--------------------+
|BusinessEntityID|        PhoneNumber|PhoneNumberTypeID|       ModifiedDate|EmailAddressID|        EmailAddress|             rowguid|
+----------------+-------------------+-----------------+-------------------+--------------+--------------------+--------------------+
|             196|       793-555-0179|                1|2008-12-17 00:00:00|           196|shammi0@adventure...|F62FC889-EADF-48C...|
|             274|       238-555-0197|                1|2010-12-28 00:00:00|           274|stephen0@adventur...|FAFCED5B-FF48-439...|
|            1746|       484-555-0100|                1|2013-03-27 00:00:00|          1042|alan3@adventure-w...|E06ACF96-2D52-447...|
|            1999|       874-555-0100|                1|2013-07-31 00:00:00|          1195|filomena0@adventu...|6E8BEC0E-AE8E-422...|
|            2006|       195-555-0100|                3|2012-0

In [27]:
# Duplicity:
# df_basket1.columns
# if df_p.count() > df_p.dropDuplicates([listOfColumns]).count():
if df_p.count() > df_p.dropDuplicates( df_p.columns ).count():
    print( 'There are Duplicates' )
    raise ValueError('Data has duplicates')
else:
    print( 'There are NO Duplicates' )

There are NO Duplicates


In [28]:
"""
Function checkDuplicates()

Description: Check if given dataframe has any duplicated ROW values in - the entire row is repeated.

Parametres: dataframe (Spark)
Return: boolean
"""

def checkDuplicates( df ):
    if df.count() > df.dropDuplicates( df.columns ).count():
        print( 'There are Duplicates' )
        raise ValueError('Data has duplicates')
        return True
    else:
        print( 'There are NO Duplicates' )
        return False


In [29]:
"""
Function checkDuplicatesByCols()

Description: Check if given dataframe has any duplicated ROW values in given COLUMNS

Parametres: dataframe (Spark), array
Return: boolean
"""

def checkDuplicatesByCol( df, listOfCols ):
    if df.count() > df.dropDuplicates( listOfCols[:] ).count():
        print( 'There are Duplicates' )
        
        print(
            df.groupby( listOfCols[:]) \
            .count() \
            .where( 'count > 1' ) \
            .sort( 'count', ascending = False ) \
            .show()
        )
        
        raise ValueError('Data has duplicates')
        return True
    else:
        print( 'There are NO Duplicates' )
        return False

In [30]:
if checkDuplicates( df_p ):
   print( 'Person Table has duplicates' )
else:
   print( 'Person Table Passed' )

cols = [
    'FirstName'
]
if checkDuplicatesByCol( df_p, cols ):
   print( 'Person Table has duplicates' )
else:
   print( 'Person Table Passed' )

There are NO Duplicates
Person Table Passed
There are Duplicates
+---------+-----+
|FirstName|count|
+---------+-----+
|  Richard|  103|
|Katherine|   99|
|    James|   97|
|   Marcus|   97|
| Jennifer|   96|
|    Lucas|   93|
|   Dalton|   93|
|Alexandra|   93|
| Isabella|   92|
|     Seth|   92|
|   Morgan|   92|
|  Natalie|   91|
|  Kaitlyn|   90|
|  Eduardo|   90|
|   Sydney|   90|
|   Robert|   90|
|      Ian|   89|
|    Julia|   89|
|   Xavier|   88|
|    Chloe|   88|
+---------+-----+
only showing top 20 rows

None


ValueError: Data has duplicates

#### Extra Notes
 * The above lines are designed to raise error to ensure the system is working correctly