In [1]:
from pyspark.sql import SparkSession as Session
from pyspark import SparkConf as Conf
from pyspark import SparkContext as Context

In [2]:
import os
os.environ['SPARK_LOCAL_IP']='192.168.1.2'
os.environ['HADOOP_HOME']='/home/geno1664/Developments/Github_Samples/RDS-ENV/hadoop'
os.environ['LD_LIBRARY_PATH']='$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native'
os.environ['PYSPARK_DRIVER_PYTHON']='jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']='notebook'
os.environ['PYSPARK_PYTHON']='python3'
os.environ['PYARROW_IGNORE_TIMEZONE']='1'

In [3]:
configuration = Conf().setAppName('RDS_1').setMaster('spark://GenoMachine:7077')
configuration.set('spark.executor.memory','10G').set('spark.driver.memory', '2G').set('spark.cores.max', '8')

<pyspark.conf.SparkConf at 0x7f4b46646a00>

In [4]:
context = Context(conf=configuration)

21/09/08 11:55:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
session = Session(context)

In [6]:
from Functions.IO import CSV_File
mainCSV = CSV_File(session, r'/home/geno1664/Developments/Github_Samples/RDS-ENV/Rural_Development_Study_No1/IO/County_Classificiations.csv')

Now Reading: /home/geno1664/Developments/Github_Samples/RDS-ENV/Rural_Development_Study_No1/IO/County_Classificiations.csv




In [7]:
mainCSV.Schema()

root
 |-- FIPStxt: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- County: string (nullable = true)
 |-- RuralUrbanContinuumCode2013: integer (nullable = true)
 |-- UrbanInfluenceCode2013: integer (nullable = true)
 |-- RuralUrbanContinuumCode2003: integer (nullable = true)
 |-- UrbanInfluenceCode2003: integer (nullable = true)
 |-- Metro2013: integer (nullable = true)
 |-- Nonmetro2013: integer (nullable = true)
 |-- Micropolitan2013: integer (nullable = true)
 |-- Type_2015_Update: integer (nullable = true)
 |-- Type_2015_Farming_NO: integer (nullable = true)
 |-- Type_2015_Manufacturing_NO: integer (nullable = true)
 |-- Type_2015_Mining_NO: integer (nullable = true)
 |-- Type_2015_Government_NO: integer (nullable = true)
 |-- Type_2015_Recreation_NO: integer (nullable = true)
 |-- Low_Education_2015_update: integer (nullable = true)
 |-- Low_Employment_2015_update: integer (nullable = true)
 |-- Population_loss_2015_update: integer (nullable = true)
 |-- Retirem

In [8]:
from databricks import koalas as ks
mainDF = mainCSV.GetSparkDF().select('State','County','Population_loss_2015_update','Type_2015_Farming_NO','Type_2015_Manufacturing_NO','Type_2015_Recreation_NO','Type_2015_Mining_NO')

In [9]:
mainDF = mainDF.repartition(50, 'State')

In [10]:
mainDF = mainDF.withColumnRenamed('Type_2015_Farming_NO', 'Farming')
mainDF = mainDF.withColumnRenamed('Type_2015_Manufacturing_NO', 'Manufacturing')
mainDF = mainDF.withColumnRenamed('Type_2015_Recreation_NO', 'Recreation')
mainDF = mainDF.withColumnRenamed('Type_2015_Mining_NO', 'Mining')
mainDF = mainDF.withColumnRenamed('Population_loss_2015_update', 'Pop_Loss')

In [11]:
mainDF = mainDF.dropna(how='any', subset=['State', 'County', 'Pop_Loss'])
mainDF = mainDF.dropna(how='all', subset=['Farming', 'Manufacturing', 'Recreation', 'Mining'])

In [12]:
mainDF.show(n=25)

+-----+------------+--------+-------+-------------+----------+------+
|State|      County|Pop_Loss|Farming|Manufacturing|Recreation|Mining|
+-----+------------+--------+-------+-------------+----------+------+
|   NH|     Belknap|       0|      0|            0|         1|     0|
|   NH|     Carroll|       0|      0|            0|         1|     0|
|   NH|    Cheshire|       0|      0|            0|         0|     0|
|   NH|        Coos|       1|      0|            0|         1|     0|
|   NH|     Grafton|       0|      0|            0|         1|     0|
|   NH|Hillsborough|       0|      0|            0|         0|     0|
|   NH|   Merrimack|       0|      0|            0|         0|     0|
|   NH|  Rockingham|       0|      0|            0|         0|     0|
|   NH|   Strafford|       0|      0|            0|         0|     0|
|   NH|    Sullivan|       0|      0|            0|         0|     0|
|   TX|    Anderson|       0|      0|            0|         0|     0|
|   TX|     Andrews|

In [20]:
farmingDF = mainDF.select('Pop_Loss', 'Farming').groupBy('Farming').mean()
manufacturingDF = mainDF.select('Pop_Loss', 'Manufacturing').groupBy('Manufacturing').mean()
recreationDF = mainDF.select('Pop_Loss', 'Recreation').groupBy('Recreation').mean()
miningDF = mainDF.select('Pop_Loss', 'Mining').groupBy('Mining').mean()

In [21]:
Pop_Losses = {}

In [22]:
Pop_Losses['Farming'] = farmingDF.first()['avg(Pop_Loss)']
Pop_Losses['Manufacturing'] = manufacturingDF.first()['avg(Pop_Loss)']
Pop_Losses['Recreation'] = recreationDF.first()['avg(Pop_Loss)']
Pop_Losses['mining'] = miningDF.first()['avg(Pop_Loss)']

In [23]:
print(Pop_Losses)

{'Farming': 0.5225225225225225, 'Manufacturing': 0.11976047904191617, 'Recreation': 0.03003003003003003, 'mining': 0.26244343891402716}
