### Project Prerequisites
#### Higlighted below are the prerequisites for successful completion of the task(s) below
- Install git on your PC
- Install Python (Version 3.9 or above)
- Setup PySpark on your local machine
- Download corresponding JDBC Driver for your RDBMS on your local (e.g Postgres, MySQL, Oracle e.t.c)
- Move the executable Jar file of the driver downloaded above to the jars directory in your Spark directory

### Project Assumption(s)
- You are running on windows PC

In [43]:
pip list

Package                    VersionNote: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'C:\Users\abiodun.sanni\AppData\Local\Programs\Python\Python39\python.exe -m pip install --upgrade pip' command.



-------------------------- -----------
agate                      1.6.1
altair                     4.2.0
anyio                      3.5.0
argon2-cffi                21.3.0
argon2-cffi-bindings       21.2.0
asn1crypto                 1.4.0
asttokens                  2.0.5
attrs                      21.2.0
azure-core                 1.21.1
azure-identity             1.7.1
Babel                      2.9.1
backcall                   0.2.0
black                      21.12b0
bleach                     4.1.0
certifi                    2021.10.8
cffi                       1.15.0
charset-normalizer         2.0.9
click                      8.0.3
colorama                   0.4.4
cryptography               36.0.1
dbt-core                   1.0.0
dbt-extractor              0.4.0
dbt-postgres               1.0.0
dbt-sqlserver              0.21.1
debugpy                    1.5.1
decorator                  5.1.1
defusedxml                 0.7.1
entrypoints                0.3
et-xmlfile               

In [1]:
### Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import functions as f
from sqlalchemy import create_engine
from getpass import getpass as gp
import os
# from lib.logger import Log4j

In [2]:
from time import perf_counter

In [3]:
from sqlalchemy import create_engine

#### Before proceeding to the next line of code, kindly go to command line and do the following
- cd C:/
- mkdir PySpark_Demo
- cd PySpark_Demo
- git clone https://github.com/Ability014/Data-Engineering-101.git
- pip install openpyxl

#### Switching directory to the new directory

In [None]:
os.chdir('C:\\PySpark_Demo\\Data-Engineering-101')

In [None]:
pwd

#### Reading the file from the folder above using pandas as it is in .xlsx format

In [None]:
import pandas as pd
import openpyxl

In [None]:
import numpy as np

#### Reading the excel file using Pandas

In [None]:
start = perf_counter()
df = pd.read_excel('AssignmentDump.xlsx')
end = perf_counter()
### Duration of data reading
print(f'This job ran for {end - start} seconds') # Duration in seconds

In [None]:
df.head()

In [None]:
df.count()

#### Replacing Missing Values

In [None]:
df['TotalDiscount'].replace(to_replace = np.nan, value =0, inplace=True)
df['TotalPrice'].replace(to_replace = np.nan, value =0, inplace=True)

In [None]:
df['MasterOrderID'].replace(to_replace = np.nan, value =-9999999, inplace=True)
df['HubNumberID'].replace(to_replace = np.nan, value =-9999999, inplace=True)

In [None]:
df.replace(to_replace = np.nan, value ='', inplace=True)

### Saving Data to Postgres

##### You can lookup here: https://docs.sqlalchemy.org/en/14/core/engines.html on how to create your connection string for different RDBMS

In [4]:
### My paramaters
database_pg = 'postgres'
# db_table = 'omnibiz'
user = 'postgres'
password = gp()

········


In [5]:
# default
engine = create_engine(f'postgresql://{user}:{password}@localhost/{database_pg}')

In [6]:
con = engine.connect()

#### Saving data to Postgres using the connection above

In [None]:
start = perf_counter()
df.to_sql('DATA_102', con)
end = perf_counter()
print(f'This job ran for {end - start} seconds') # Duration in seconds

### Configuration to create a Spark Session

In [7]:
appName = "PySpark Session" # This can be any name
master = "local[*]"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.driver.extraClassPath","C:\\spark-3.1.2-bin-hadoop3.2\\jars" #This has to be the path of your jars folder
        )

In [8]:
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession



In [None]:
url = 'jdbc:sqlserver://localhost;databaseName={}'.format('test_db2')

In [None]:
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", 'test_tab2') \
        .option("user", 'bi_test') \
        .option("password", 'password123').load()

In [None]:
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

In [None]:
spark

#### Reading table from Postgres with above parameters

In [9]:
db_table = 'public.omnibiz'

In [10]:
df1 = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("dbtable", db_table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [23]:
pwd()

'C:\\Users\\abiodun.sanni'

In [26]:
df_csv = spark.read.csv(path = 'C:\\Users\\abiodun.sanni\\Downloads\\Ominibiz_Data\\Omnibiz_csvv.csv', 
                        sep = '|', inferSchema=True, header=True)

In [27]:
df_csv.count()

69286

In [12]:
df1.show(5)

+---------------+-------------+-----------+-------------+--------------------+----------------+----------+------------+----------+------------+---------------+-----------------+--------------------+-----------+-----------------+----------+-------------+-------------+--------------+----------------+---------+
|orderplaceddate|masterorderid|hubnumberid|orderlocation|         hublocation|orderplacedmonth|customerid|customername|totalprice|orderchannel|customer_status|orderfromcustomer| callcentreagentname|orderstatus|ordercancelreason|couponcode|totaldiscount|promodiscount|walletdiscount|        cityname|statename|
+---------------+-------------+-----------+-------------+--------------------+----------------+----------+------------+----------+------------+---------------+-----------------+--------------------+-----------+-----------------+----------+-------------+-------------+--------------+----------------+---------+
|     2021-03-01|       490241|     490932|        Lagos|Omnibiz Retai

In [14]:
df1.createOrReplaceTempView('omnibiz_view')

In [15]:
df2 = spark.sql("""SELECT orderplaceddate, orderlocation, orderplacedmonth,
                customerid,customername,totalprice,orderchannel FROM omnibiz_view""")

In [18]:
df1.printSchema()

root
 |-- orderplaceddate: date (nullable = true)
 |-- masterorderid: string (nullable = true)
 |-- hubnumberid: string (nullable = true)
 |-- orderlocation: string (nullable = true)
 |-- hublocation: string (nullable = true)
 |-- orderplacedmonth: string (nullable = true)
 |-- customerid: string (nullable = true)
 |-- customername: string (nullable = true)
 |-- totalprice: integer (nullable = true)
 |-- orderchannel: string (nullable = true)
 |-- customer_status: string (nullable = true)
 |-- orderfromcustomer: string (nullable = true)
 |-- callcentreagentname: string (nullable = true)
 |-- orderstatus: string (nullable = true)
 |-- ordercancelreason: string (nullable = true)
 |-- couponcode: string (nullable = true)
 |-- totaldiscount: string (nullable = true)
 |-- promodiscount: integer (nullable = true)
 |-- walletdiscount: integer (nullable = true)
 |-- cityname: string (nullable = true)
 |-- statename: string (nullable = true)



In [17]:
df2.show(6)

+---------------+-------------+----------------+----------+--------------+----------+------------+
|orderplaceddate|orderlocation|orderplacedmonth|customerid|  customername|totalprice|orderchannel|
+---------------+-------------+----------------+----------+--------------+----------+------------+
|     2021-03-01|        Lagos|           March|   3783983|  Iya Suleiman|     38450|        null|
|     2021-03-01|        Lagos|           March|   4010701|  Mummy jamali|     37700|  CallCentre|
|     2021-03-01|        Lagos|           March|   2019216|     Mrs Toyin|     27420|  CallCentre|
|     2021-03-01|        Lagos|           March|   1821255|        Mariam|     22340|  CallCentre|
|     2021-03-01|        Lagos|           March|   3908446|         Mr Ab|     25500|  CallCentre|
|     2021-03-01|        Lagos|           March|   3864852|ALHAJA TENIOLA|     38450|        null|
+---------------+-------------+----------------+----------+--------------+----------+------------+
only showi

In [21]:
df3 = spark.sql("""SELECT * FROM omnibiz_view
                WHERE 1=2""")

In [40]:
ll = [i.lower() for i in df_csv.columns]

In [42]:
type(df_csv.columns)

list

In [29]:
'UPPER'.lower()

'upper'

In [32]:
df_csv.columns.

['OrderPlacedDate',
 'MasterOrderID',
 'HubNumberID',
 'OrderLocation',
 'HubLocation',
 'OrderPlacedMonth',
 'customerid',
 'Customerme',
 'TotalPrice',
 'OrderChannel',
 'Customer_Status',
 'OrderFromCustomer',
 'CallCentreAgentme',
 'OrderStatus',
 'OrderCancelReason',
 'couponcode',
 'TotalDiscount',
 'PromoDiscount',
 'walletDiscount',
 'Cityme',
 'Stateme']

In [28]:
df_csv.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/ability") \
   .option("dbtable", "public.omnibiz") \
   .option("user", user) \
   .option("password", password) \
   .save()

In [None]:
df1.show(n=3)

In [None]:
display(df1)

In [None]:
df1.groupBy('OrderLocation').count().show()

In [None]:
df1.filter(f.column('OrderLocation').isin(['Lagos','Kaduna']))\
    .select('OrderPlacedDate','OrderLocation','TotalPrice')\
    .show(truncate=False)

In [None]:
df1.filter(f.column('OrderLocation') == '')\
    .select('OrderPlacedDate','OrderLocation','TotalPrice')\
    .show()

In [None]:
df1.where(f.column('OrderLocation') == '')\
    .select('OrderPlacedDate','OrderLocation','TotalPrice')\
    .show()

In [None]:
df1.where('OrderLocation == ""')\
    .select('OrderPlacedDate','OrderLocation','TotalPrice')\
    .show()

##### Creating a Temporary View of the dataframe above

In [None]:
df1.createOrReplaceTempView("df1_view")

## Working with Spark SQL

#### Querying the view above using Spark SQL

In [None]:
#### April - May, 2021 sales
start = perf_counter() 
### Transformation 1
Apr_May_2021_sales = spark.sql("""SELECT OrderPlacedDate, HubNumberID, OrderLocation, customerid, TotalPrice
                                  FROM df1_view WHERE OrderPlacedDate >= '2021-04-01' AND OrderPlacedDate < '2021-06-01'""")
end = perf_counter()
print(f'This job ran for {end - start} seconds') # Duration in seconds

In [None]:
Apr_May_2021_sales.show()

In [None]:
### Transformation 2
Apr_May_2021_sales.createOrReplaceTempView('Apr_May_2021_sales')
Apr_May_2021_aggregated = spark.sql("""SELECT date_format(OrderPlacedDate, 'yyyy-MM') YEAR_MONTH, OrderLocation, SUM(TotalPrice) REVENUE
                                        FROM Apr_May_2021_sales
                                        GROUP BY date_format(OrderPlacedDate, 'yyyy-MM'), OrderLocation""")

In [None]:
### Action 1
Apr_May_2021_aggregated.show()

#### Top 3 Daily sales per month using Spark SQL

In [None]:
#### Top 3 daily sales per month
start = perf_counter()
spark.sql("""SELECT YEAR_MON, OrderPlacedDate, daily_rev
            FROM (SELECT YEAR_MON, OrderPlacedDate, daily_rev, RANK() OVER (PARTITION BY YEAR_MON ORDER BY daily_rev DESC) RNK
            FROM (SELECT DATE_FORMAT(OrderPlacedDate, 'yyyy-MM') YEAR_MON, OrderPlacedDate, sum(TotalPrice) daily_rev 
            FROM df1_view 
            GROUP BY DATE_FORMAT(OrderPlacedDate, 'yyyy-MM'), OrderPlacedDate) TT1) TT2
            WHERE RNK <= 3
            ORDER BY 1, 3 DESC""").show()
end = perf_counter()
print(f'This job ran for {end - start} seconds')

You can view Spark Jobs at: http://localhost:4040

In [None]:
df1.printSchema()

In [None]:
df1.show(n=3)

In [None]:
df1.count()

In [None]:
type(df1)

#### Performing Simple transformation & aggregation with PySpark

In [None]:
#### April - May, 2021 sales
start = perf_counter()
df1.select('OrderPlacedDate', 'HubNumberID', 'OrderLocation', 'customerid', 'TotalPrice')\
    .filter("OrderPlacedDate >= '2021-04-01' and OrderPlacedDate < '2021-06-01' ")\
    .show(n=20, truncate=False)
end = perf_counter()
print(f'This job ran for {end - start} seconds') # Duration in seconds

#### Top 3 Daily sales per month using PySpark

In [None]:
from pyspark.sql import Window

In [None]:
### Creating a window
start = perf_counter()
ranking = Window.partitionBy("YEAR_MON")\
            .orderBy(f.col("daily_rev").desc())
top3_dailySales_per_month = df1.select(f.date_format("OrderPlacedDate","yyyy-MM").alias("YEAR_MON"), "OrderPlacedDate", "TotalPrice")\
    .groupBy("YEAR_MON", "OrderPlacedDate")\
    .agg(f.sum("TotalPrice").alias("daily_rev"))\
    .select("YEAR_MON", "OrderPlacedDate", "daily_rev", f.rank().over(ranking).alias("RNK"))\
    .filter("RNK<=3")\
    .select("YEAR_MON", "OrderPlacedDate", "daily_rev")
end = perf_counter()
print(f'This job ran for {end - start} seconds') # Duration in seconds

#### Writing the aggregated data to your RDBMS

In [None]:
top3_dailySales_per_month.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/postgres") \
   .option("dbtable", "public.top3_sales") \
   .option("user", user) \
   .option("password", password) \
   .save()

### THERE IS MORE YOU CAN ACHIEVE

### Stopping the Spark Session

In [None]:
spark.stop()

#### Working with SQL Server

In [None]:
#from sqlalchemy import create_engine
import pymssql
import pyodbc
# pymssql
engine = create_engine('mssql+pymssql://{}:{}@{}:{}/{}'.format(user2, password, hostname, port, database))

#### Defining parameters for SQL Server

In [None]:
database = 'test_db2'
#url = f"jdbc:sqlserver://172.19.20.48;databaseName={database}"
url2 = 'jdbc:sqlserver://localhost;databaseName={}'.format(database)
user2 = 'bi_user'
password = password
hostname = "localhost"
port = 1433

#### Reading table from SQL Server with above parameters

In [None]:
df2 = spark.read.format("jdbc").option("url", url2) \
    .option("dbtable", 'test_tab2') \
    .option("user", user2) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

In [None]:
print('This is a \" and another \' and \tand \v and \r in this string')

In [None]:
a=50
b=39
a=b
id(a) is id(b)

In [None]:
a,b
id(a), id(b)
id(a)==id(b)

In [None]:
b is a

In [None]:
a = 3
b = 3
a is b

In [None]:
a

In [None]:
def add(a, b):
    b = 3
    a *=b
    return a

In [None]:
add(a,b)

In [None]:
def factn(n):
    if n >= 1:
        #while n >= 1:
        result = n*factn(n-1)
            #return n*factn(n-1)
    elif n == 0:
        return 1
    else:
        return 'This is not a valid number'
    return result

In [None]:
factn(5)

In [None]:
import math

In [None]:
import sys

In [None]:
sys.argv[]

In [None]:
'Z' > 'y'

In [None]:
cc='Hello world'

In [None]:
for i in 'Hello world!':
    if i == ' ':
        print('This is a space')

In [None]:
def addtwo(a):
    try:
        if isinstance(a, (int, float)):
            print(a*3)
        else:
            print('Not a valid argument')
    finally:
        print('It finally worked')

In [None]:
addtwo('a')

In [None]:
def aPower3(a):
    try:
        print(a**3)
    except TypeError as e:
        print(f'Error: {e}')
    finally:
        print('It finally worked')

In [None]:
aPower3('a')

In [None]:
import hashlib

In [None]:
hash_obj=hashlib.sha256(b'hello')
hex_dig = hash_obj.hexdigest()
print(hex_dig)

In [None]:
# c -99
# A - 65
# r - 114
# 1 - 49
99*131**3+65*131**2+114*131+49 -- 223691457

In [None]:
223691457%131, 223691457//131

In [None]:
1707568%131, 1707568//131

In [None]:
13034%131, 13034//131

In [None]:
99%131, 99//131

In [None]:
chr(244736787%131), 244736787%131, 244736787//131

In [None]:
chr(1868219%131), 1868219%131, 1868219//131

In [None]:
chr(14261%131), 14261%131, 14261//131

In [None]:
list(reversed(['99','65','114']))

In [None]:
ll[:-1]

In [None]:
a=None

In [None]:
if a is None:
    print('a is none')

In [None]:
cc

In [None]:
type(chr(49))

In [None]:
ll = [[1,2],[3,4]]
for i in ll:
    cat()

In [None]:
### zip, chain
### Explore itertools module
zip([1,2,3],[4,5,6])

In [None]:
from itertools import chain
for i in chain([1,2,3],[4,5,6,7,8]):
    i

In [None]:
ll = [1,5,2,3,5]
ll = sorted(ll)
ll

In [None]:
ll.remove(5)
ll

In [None]:
ll.append([1,2])
ll

In [None]:
set1 = set([1,2,3,4,5])
set2 = set([3,4,5,6,7])
sorted(list(set1.intersection(set2)), reverse=True)

In [None]:
''.join(['a','b','cd'])

In [None]:
['odd' for i in range(1,21) if i%2!=0]

In [None]:
a = [1,2,3,4,5]
b = [3,4,5,6,7]
for i in b:
    for j in a:
        if i < j:
            break
        else:
            print(i)

In [None]:
ll.insert(0, [11,12,13])
ll