## Connect to Spark standalone cluster

In [None]:
!/usr/local/spark/sbin/stop-all.sh

In [None]:
!/usr/local/spark/sbin/start-all.sh

In [None]:
!jps

In [None]:
import os
import pyspark
import pymssql
import pandas as pd
from pyspark import SparkContext, SparkConf, pandas as ps
from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    print("No Spark Session")

sparkClassPath =  '/usr/local/spark/jars/sqljdbc42.jar'

spark = SparkSession.builder \
        .config("spark.driver.extraClassPath", sparkClassPath) \
        .config("spark.jars", sparkClassPath) \
        .appName("Southbridge Analytics") \
        .master("spark://sparkc:7077") \
        .getOrCreate()


## Connect to SQL Server and create DataFrame

In [None]:
server = "sql"
database = "adworks"
table = "SalesLT.Customer"
user = "sa"
password  = "P@ssw0rd"
 

sql1 = """
SELECT c.CustomerID, c.CompanyName,COUNT(soh.SalesOrderID) AS OrderCount 
FROM SalesLT.Customer AS c LEFT OUTER JOIN SalesLT.SalesOrderHeader AS soh 
ON c.CustomerID = soh.CustomerID 
GROUP BY c.CustomerID, c.CompanyName
"""

sql2 = """
SELECT c.CompanyName, a.AddressLine1, ISNULL(a.AddressLine2, '') AS AddressLine2,
a.City, a.StateProvince, a.PostalCode, a.CountryRegion, oh.SalesOrderID, oh.TotalDue
FROM SalesLT.Customer AS c
JOIN SalesLT.SalesOrderHeader AS oh
ON oh.CustomerID = c.CustomerID
JOIN SalesLT.CustomerAddress AS ca
ON c.CustomerID = ca.CustomerID AND AddressType = 'Main Office'
JOIN SalesLT.Address AS a
ON ca.AddressID = a.AddressID
"""


#read table data into a spark dataframe
jdbcDF1 = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("query", sql1) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

jdbcDF2 = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("query", sql2) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

In [None]:
jdbcDF1.show(5)
jdbcDF2.show(5)

## Write DataFrame to HDFS as Parquet

In [None]:
# create dir in hdfs if not already there
os.system('hdfs dfs -mkdir hdfs://localhost:9000/sql-spoke/')

In [None]:
jdbcDF1.write.parquet("hdfs://localhost:9000/sql-spoke/sql1.parquet")
jdbcDF2.write.parquet("hdfs://localhost:9000/sql-spoke/sql2.parquet")

## Validate Parquet from HDFS

In [None]:
testdf1 = spark.read.parquet("hdfs://localhost:9000/sql-spoke/sql1.parquet")
testdf2 = spark.read.parquet("hdfs://localhost:9000/sql-spoke/sql2.parquet")

In [None]:
testdf1.show(2)
testdf2.show(2)