In [58]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/om/.google/credentials/google_credentials.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/om/lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", credentials_location)
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

22/04/22 06:40:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [59]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [60]:
df_1995 = spark.read.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/new_york_tree_census_1995.parquet')

In [61]:
df_2015 = spark.read.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/new_york_tree_census_2015.parquet')

In [62]:
col_1995 = ('address','house_number','street','zip_original','cb_original','site','species','diameter','wires','sidewalk_condition'
           'support_structure','x','y','longitude','latitude','cb_new','zip_new','censustract_2010','censusblock_2010','nta_2010',
           'segmentid','spc_latin','location','recordid','sidewalk_condition','support_structure')

df_1995 = df_1995.drop(*col_1995) 

In [63]:
df_1995.show(10)

+---------+-------------+----------------+
|   status|      borough|      spc_common|
+---------+-------------+----------------+
|     Good|       Queens|         OAK PIN|
|Excellent|        Bronx|     HONEYLOCUST|
|     Good|Staten Island|OAK NORTHERN RED|
|     Poor|       Queens|    MAPLE NORWAY|
|     Good|       Queens|    MAPLE NORWAY|
|     Good|       Queens|         OAK PIN|
|     Poor|     Brooklyn|LONDON PLANETREE|
|     Good|Staten Island|          GINKGO|
|Excellent|        Bronx|     HONEYLOCUST|
|     Good|       Queens|       ASH GREEN|
+---------+-------------+----------------+
only showing top 10 rows



In [64]:
col_2015 = ('tree_id','block_id','created_at','tree_dbh','stump_diam','curb_loc','status','spc_latin','steward','guards','sidewalk',
           'user_type','problems','root_stone','root_grate','root_other','trunk_wire','trnk_light','trnk_other','brch_light',
           'brch_shoe','brch_other','address','zipcode','zip_city','cb_num','borocode','cncldist','st_assem','st_senate',
           'nta','nta_name','boro_ct','state','latitude','longitude','x_sp','y_sp')
df_2015 = df_2015.drop(*col_2015) 

In [66]:
from pyspark.sql.functions import lit
df_1995 = df_1995.withColumnRenamed("status","health")\
                 .withColumnRenamed("borough","boroname")\
                 .withColumn("census_year",lit(1995))

In [67]:
df_1995.show(10)

+---------+-------------+----------------+-----------+
|   health|     boroname|      spc_common|census_year|
+---------+-------------+----------------+-----------+
|     Good|       Queens|         OAK PIN|       1995|
|Excellent|        Bronx|     HONEYLOCUST|       1995|
|     Good|Staten Island|OAK NORTHERN RED|       1995|
|     Poor|       Queens|    MAPLE NORWAY|       1995|
|     Good|       Queens|    MAPLE NORWAY|       1995|
|     Good|       Queens|         OAK PIN|       1995|
|     Poor|     Brooklyn|LONDON PLANETREE|       1995|
|     Good|Staten Island|          GINKGO|       1995|
|Excellent|        Bronx|     HONEYLOCUST|       1995|
|     Good|       Queens|       ASH GREEN|       1995|
+---------+-------------+----------------+-----------+
only showing top 10 rows



In [69]:
df_2015 = df_2015.withColumn("census_year",lit(2015))

In [70]:
df_2015.show(10)

+------+--------------------+---------+-----------+
|health|          spc_common| boroname|census_year|
+------+--------------------+---------+-----------+
|  Good|           green ash|   Queens|       2015|
|  Good|         honeylocust|   Queens|       2015|
|  Good|        Callery pear|   Queens|       2015|
|  Good|        Callery pear| Brooklyn|       2015|
|  Good|'Schubert' chokec...|   Queens|       2015|
|  Good|         honeylocust|Manhattan|       2015|
|  Fair|    northern red oak| Brooklyn|       2015|
|  Good|     American linden| Brooklyn|       2015|
|  Good|             pin oak|   Queens|       2015|
|  Good|        American elm|    Bronx|       2015|
+------+--------------------+---------+-----------+
only showing top 10 rows



In [71]:
df_species = spark.read.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/new_york_tree_species.parquet')

In [72]:
df_species = df_species.drop('comments')

In [73]:
df_species.printSchema()

root
 |-- species_scientific_name: string (nullable = true)
 |-- species_common_name: string (nullable = true)
 |-- form: string (nullable = true)
 |-- growth_rate: string (nullable = true)
 |-- fall_color: string (nullable = true)
 |-- environmental_tolerances: string (nullable = true)
 |-- location_tolerances: string (nullable = true)
 |-- notes_suggested_cultivars: string (nullable = true)
 |-- tree_size: string (nullable = true)



In [55]:
df_2015=df_2015.filter((df_2015.health != ""))

In [56]:
df_2015.select('health').distinct().show()



+------+
|health|
+------+
|  Good|
|  Fair|
|  Poor|
+------+



In [84]:
df_1995.select('health').distinct().show()



+--------------+
|        health|
+--------------+
|     Excellent|
|          Dead|
|Planting Space|
|          Good|
|       Unknown|
|          Fair|
|         Stump|
|          Poor|
|         Shaft|
|      Critical|
+--------------+



In [80]:
df_2015=df_2015.filter((df_2015.spc_common != ""))

In [24]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed")

In [81]:
df_1995.coalesce(1).write.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/census_1995')

                                                                                

In [82]:
df_2015.coalesce(1).write.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/census_2015')

                                                                                

In [83]:
df_species.write.parquet('gs://nf_data_lake_speedy-carver-347016/NY_Tree_Census/tree_species')

                                                                                