In [None]:
#Created 2021-09-08
#Copyright Spencer W. Leifeld

In [None]:
from pyspark.sql import SparkSession as Session
from pyspark import SparkConf as Conf
from pyspark import SparkContext as Context

In [None]:
import os
os.environ['SPARK_LOCAL_IP']='192.168.1.2'
os.environ['HADOOP_HOME']='/home/geno1664/Developments/Github_Samples/RDS-ENV/hadoop'
os.environ['LD_LIBRARY_PATH']='$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native'
os.environ['PYSPARK_DRIVER_PYTHON']='jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='notebook'
os.environ['PYSPARK_PYTHON']='python3'
os.environ['PYARROW_IGNORE_TIMEZONE']='1'

In [None]:
configuration = Conf().setAppName('RDS_2').setMaster('spark://GenoMachine:7077')
configuration.set('spark.executor.memory','10G').set('spark.driver.memory', '2G').set('spark.cores.max', '8')

In [None]:
context = Context(conf=configuration)

In [None]:
session = Session(context)

In [None]:
from Functions.IO import CSV_File

In [None]:
csvDF = CSV_File(session, r'/home/geno1664/Developments/Github_Samples/RDS-ENV/Rural_Development_Study_No2/IO/Jobs.csv')
employmentByJob = csvDF.GetSparkDF().select('State', 'County', 'PctEmpAgriculture', 'PctEmpConstruction', 'PctEmpMining', 'PctEmpTrade', 'PctEmpTrans', \
    'PctEmpInformation', 'PctEmpFIRE', 'PctEmpServices', 'PctEmpGovt', 'PctEmpManufacturing')

In [None]:
employmentByJob = employmentByJob.withColumnRenamed('PctEmpAgriculture', 'Farmers').withColumnRenamed('PctEmpConstruction', 'Builders').withColumnRenamed('PctEmpMining', 'Miners') \
    .withColumnRenamed('PctEmpTrade', 'Retail_Associates').withColumnRenamed('PctEmpFIRE', 'Businessmen').withColumnRenamed('PctEmpServices', 'Hospitality_Associates') \
        .withColumnRenamed('PctEmpGovt', 'Civil_Servants').withColumnRenamed('PctEmpManufacturing', 'Craftsmen').withColumnRenamed('PctEmpInformation', 'Technologists') \
            .withColumnRenamed('PctEmpTrans', 'Teamsters')

In [None]:
employmentByJob = employmentByJob.where(employmentByJob.State != 'US')
employmentByJob = employmentByJob.repartition('State')

In [None]:
employmentByJob.show()

In [None]:
csvDF = CSV_File(session, r'/home/geno1664/Developments/Github_Samples/RDS-ENV/Rural_Development_Study_No2/IO/People.csv')
educationRate = csvDF.GetSparkDF().select('State', 'County', 'Ed1LessThanHSPct', 'Ed2HSDiplomaOnlyPct', 'Ed3SomeCollegePct', 'Ed4AssocDegreePct', 'Ed5CollegePlusPct')

In [None]:
educationRate = educationRate.withColumnRenamed('Ed1LessThanHSPct', 'Some_High_School').withColumnRenamed('Ed2HSDiplomaOnlyPct', 'High_School_Degree') \
    .withColumnRenamed('Ed3SomeCollegePct', 'Some_College').withColumnRenamed('Ed4AssocDegreePct', 'Associates_Degree').withColumnRenamed('Ed5CollegePlusPct', 'College_Graduate')

In [None]:
educationRate = educationRate.where(educationRate.State != 'US')
educationRate = educationRate.repartition('State')

In [None]:
educationRate.show()

In [None]:
from databricks import koalas as ks

In [None]:
employmentByJob = employmentByJob.to_koalas().melt(id_vars=['State', 'County'], var_name='Employment_Catagory', value_name='Employment_Rate').to_spark()
educationRate = educationRate.to_koalas().melt(id_vars=['State', 'County'], var_name='Education_Catagory', value_name='Education_Rate').to_spark()

In [None]:
mainDF = employmentByJob.join(educationRate, on=['State', 'County'], how='cross').fillna(0, subset=['Employment_Rate', 'Education_Rate']).dropna(subset=['Employment_Catagory', 'Education_Catagory'], how='any')

In [None]:
mainDF.show()