In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [3]:
df = spark.read.csv('point_12345.csv', inferSchema=True, header=True)

In [4]:
extract_cols = ['state', 'county', 'site name', 'naics description', 'site latitude',
           'site longitude', 'city', 'pollutant desc', 'pollutant type(s)',
           'total emissions']

In [5]:
edf = df.select(*extract_cols)

In [6]:
df2 = spark.read.csv('point_678910.csv', inferSchema=True, header=True)

In [7]:
edf2 = df2.select(*extract_cols)

In [8]:
edf.join(edf2)

DataFrame[state: string, county: string, site name: string, naics description: string, site latitude: string, site longitude: string, city: string, pollutant desc: string, pollutant type(s): string, total emissions: string, state: string, county: string, site name: string, naics description: string, site latitude: string, site longitude: string, city: string, pollutant desc: string, pollutant type(s): string, total emissions: double]

In [8]:
df.count()

3481088

In [9]:
df2.count()

5528730

In [16]:
sdf = edf.union(edf2)

In [11]:
sdf.count()

9009818

In [18]:
sdf.columns

['state',
 'county',
 'site name',
 'naics description',
 'site latitude',
 'site longitude',
 'city',
 'pollutant desc',
 'pollutant type(s)',
 'total emissions']

In [21]:
sdf =sdf.withColumnsRenamed({'site name':'site_name','naics description': 'industry','site latitude':'latitude',"site longitude":'longitude','pollutant desc':'pollutant','pollutant type(s)':'poll_ctg','total emissions':'total_emissions'})

In [37]:
sdf.show()

+-----+----------+--------------------+--------------------+---------+----------+--------------+--------------------+--------+---------------+
|state|    county|           site_name|            industry| latitude| longitude|          city|           pollutant|poll_ctg|total_emissions|
+-----+----------+--------------------+--------------------+---------+----------+--------------+--------------------+--------+---------------+
|   CT|   Windham|PLAINFIELD RENEWA...|Other Electric Po...|41.663864|-71.924373|    PLAINFIELD|Decachlorobipheny...|     HAP|    0.000754162|
|   ME| Penobscot|COLD BROOK ENERGY...|Petroleum Bulk St...|44.779078|-68.783157|       HAMPDEN| Sulfur Hexafluoride|     GHG|              0|
|   ME| Penobscot|       VERSANT POWER|Electric Power Di...|44.799928|-68.736942|        BANGOR| Sulfur Hexafluoride|     GHG|         0.1324|
|   ME|Washington|NAVAL SUPPORT ACT...|   National Security|44.645007|-67.283141|        CUTLER| Sulfur Hexafluoride|     GHG|            0.4|

In [22]:
sdf.columns

['state',
 'county',
 'site_name',
 'industry',
 'latitude',
 'longitude',
 'city',
 'pollutant',
 'poll_ctg',
 'total_emissions']

In [23]:
sdf.createOrReplaceTempView('df')

In [29]:
spark.sql('select state, sum(total_emissions) from df group by state order by sum(total_emissions) desc limit 10').show()

+-----+--------------------+
|state|sum(total_emissions)|
+-----+--------------------+
|   TX| 4.072175716753692E8|
|   LA|1.6087889972410706E8|
|   FL| 1.495419340140622E8|
|   OH| 1.245438019857818E8|
|   IN|  1.20842531831258E8|
|   PA|1.2036651803065033E8|
|   AL|1.0898535269249356E8|
|   IL|1.0374672328692727E8|
|   CA|1.0061518869665462E8|
|   GA| 8.919194879008561E7|
+-----+--------------------+



In [32]:
spark.sql('select city, sum(total_emissions) as total_pollution from df group by city order by sum(total_emissions) desc limit 10').show()

+--------------+--------------------+
|          city|     total_pollution|
+--------------+--------------------+
|    Sweetwater| 2.452315994588952E7|
|      Cheshire| 2.062853765631697E7|
|   PORT ARTHUR| 1.964489115416678E7|
|       Quinton| 1.894257893232332E7|
|       BAYTOWN|1.7745490611830156E7|
|       LABADIE|1.7577794345713653E7|
|      Maricopa| 1.751677938303785E7|
|CORPUS CHRISTI|1.5637102928466367E7|
|       HOUSTON|1.5349329664186778E7|
|         TATUM|1.5338468926330501E7|
+--------------+--------------------+



In [35]:
spark.sql('select county, sum(total_emissions) as total_pollution from df group by county order by sum(total_emissions) desc limit 10').show()

+-----------+--------------------+
|     county|     total_pollution|
+-----------+--------------------+
|  Jefferson|1.1896568075375293E8|
|     Harris| 6.015673890824691E7|
| Washington| 3.587487080873698E7|
|     Monroe| 3.356192264005195E7|
|Los Angeles|2.7543810241346207E7|
|       Lake|2.6823505063828953E7|
|    Jackson|2.5787102516653806E7|
| Sweetwater| 2.453245453371554E7|
|    Lincoln|2.4181538905438203E7|
|St. Charles|2.4174287421151415E7|
+-----------+--------------------+



In [33]:
spark.sql('select industry, sum(total_emissions) as total_pollution from df group by industry order by sum(total_emissions) desc limit 10').show()

+--------------------+--------------------+
|            industry|     total_pollution|
+--------------------+--------------------+
|Fossil Fuel Elect...|1.2503448738085728E9|
|Electric Power Ge...| 2.023790979677431E8|
|Petroleum Refineries|1.9407990206573215E8|
|Electric Power Ge...| 1.049360423264049E8|
|Paper (except New...| 8.930789223354995E7|
|    Paperboard Mills|  7.99106389740636E7|
|Cement Manufacturing| 7.898487027265355E7|
|Petrochemical Man...| 6.846116048962235E7|
|          Pulp Mills| 5.441892292423914E7|
|Natural Gas Extra...| 5.101593366378633E7|
+--------------------+--------------------+



In [34]:
spark.sql('select pollutant, sum(total_emissions) as total_pollution from df group by pollutant order by sum(total_emissions) desc limit 10').show()

+--------------------+--------------------+
|           pollutant|     total_pollution|
+--------------------+--------------------+
|      Carbon Dioxide|2.5674802768007593E9|
|            Methanol|1.0185947542350157E8|
|              Hexane| 5.148554470683535E7|
|        Formaldehyde| 5.022772788926394E7|
|   Hydrochloric Acid|3.6396645884730265E7|
|             Styrene| 2.841544176547743E7|
|             Toluene| 2.824911781267032E7|
|Xylenes (Mixed Is...|1.8893736128186546E7|
|        Acetaldehyde|  1.87352876350081E7|
|    Hydrogen Sulfide| 1.857908610274387E7|
+--------------------+--------------------+



In [36]:
spark.sql('select poll_ctg, sum(total_emissions) as total_pollution from df group by poll_ctg order by sum(total_emissions) desc limit 10').show()

+--------+--------------------+
|poll_ctg|     total_pollution|
+--------+--------------------+
|     GHG| 2.571921477151565E9|
|     HAP|4.5780884460822546E8|
|   Other| 1.915585044394754E7|
|     CAP|   8120964.049714407|
| CAP/HAP|   663454.0716440291|
|    PFAS|  284.78101200000003|
|  0.8899|                null|
+--------+--------------------+

