# Geo Addressing SDK Sample for EMR using PySpark

This sample driver program demonstrates the execution of geocoding operation of Geo Addressing SDK in EMR cluster.

### Configuring the Spark Session
- Before running the below command, copy the Geo Addressing SDK distribution contents inside an HDFS directory.
- Provide the HDFS path of sdk jar available in the 'pyspark/driver' directory of the Geo Addressing SDK distribution.

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars": "/user/hadoop/addressing/sdk/software/pyspark/driver/spectrum-bigdata-addressing-sdk-spark2_2.12-<PRODUCT_VERSION>.jar, /user/hadoop/addressing/lib/spark-snowflake_2.12-2.10.1-spark_3.2.jar, /user/hadoop/addressing/lib/snowflake-jdbc-3.13.22.jar",
        "spark.sql.legacy.allowUntypedScalaUDF": true
    }
}

### Creating a Spark Session
- A SparkSession with above configurations will be instantiated after Importing the required PySpark Libraries automatically.

In [3]:
# Importing the required Libraries of PySpark.
from pyspark.sql.functions import lit, col, udf, create_map
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType, IntegerType
import time

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1666251443800_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create the Input DataFrame for the table in Snowflake using Snowflake Connector for Spark

In [5]:
# Reading the dataframe from the table in the Snowflake using Snowflake Connector for Spark.
sfOptions = {
  "sfUrl": "YOUR_SNOWFLAKE_URL",
  "sfUser": "USERNAME",
  "sfPassword": "PASSWORD",
  "sfDatabase": "DATABASE_NAME",
  "sfSchema": "SCHEMA_NAME",
  "sfWarehouse": "WAREHOUSE_NAME",
  "usestagingtable": "off",
  "query": "select ID, ADDRESS, COUNTRY from YOUR_TABLE limit 100"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

inputTable = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .load()

inputTable.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------------------------------+------------------------+
|ID      |ADDRESS                                     |COUNTRY                 |
+--------+--------------------------------------------+------------------------+
|16616048|171 INDUSTRIAL WAY,BRISBANE,CA,94005        |UNITED STATES OF AMERICA|
|16617770|147 N LIVERMORE AVE STE C,LIVERMORE,CA,94550|UNITED STATES OF AMERICA|
|16478264|17 W 19TH ST,NEW YORK,NY,10011              |UNITED STATES OF AMERICA|
|16478276|3940 SLUSARIC RD,NORTH TONAWANDA,NY,14120   |UNITED STATES OF AMERICA|
|16553442|1341 CENTER DR UNIT B,MEDFORD,OR,97501      |UNITED STATES OF AMERICA|
+--------+--------------------------------------------+------------------------+
only showing top 5 rows

### Configuring the Spark Context
- SparkContext is accessible from SparkSession.
- Provide the HDFS path of the Python SDK Zip file available in the 'pyspark/driver' directory of the Geo Addressing SDK distribution.

In [6]:
# Creating a Spark Context from Spark Session.
sc = spark.sparkContext
sc.addPyFile('hdfs:///user/hadoop/addressing/sdk/software/pyspark/driver/spectrum-bigdata-addressing-sdk-pyspark-<PRODUCT_VERSION>.zip')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Importing the required Geo Addressing SDK Python classes

In [7]:
# Importing the required Geo Addressing SDK Python classes which are present in the above passed zip file.
from addressing.DownloadManagerBuilder import DownloadManagerBuilder
from addressing.AddressingBuilder import AddressingBuilder
from addressing.HDFSDownloader import HDFSDownloader
from addressing.S3Downloader import S3Downloader
from addressing.LocalFilePassthroughDownloader import LocalFilePassthroughDownloader
from addressing.PreferencesBuilder import PreferencesBuilder
from addressing.HadoopConfiguration import HadoopConfiguration
from addressing.UDFExecutor import UDFExecutor

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Creating the PySpark Geocode UDF
- For more information on configuring and creating the required UDF, please visit the documentation page: https://docs.precisely.com/docs/sftw/hadoop/landingpage/docs/geocoding/webhelp/Geocoding/source/geocoding/addressing/addressing_spark_api.html

In [8]:
# Configuring the AddressingBuilder using Resource Location.
addressingBuilder = AddressingBuilder() \
.withResourcesLocation("/mnt/pb/geocoding/software/resources/")

# Configuring the UDF Builder as per Required Preferences, Output Fields and Error Fields.
udfBuilder = addressingBuilder.udfBuilder() \
.withPreferences(PreferencesBuilder().withReturnAllInfo(True).build()) \
.withOutputFields("address.formattedStreetAddress as formattedStreetAddress",
                                                   "address.formattedLocationAddress as formattedLocationAddress",
                                                   "location.feature.geometry.coordinates.x as x",
                                                   "location.feature.geometry.coordinates.y as y",
                                                   "customFields['PB_KEY'] as 'PB_KEY'") \
.withErrorField("error")


# Creating the Geocode UDF.
geocodeUDF = udfBuilder.forGeocode()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Execute the Geocode UDF by passing the approprite fields from the Input Data.
geocodeOutput = inputTable.withColumn("GEOCODE_OUTPUT", UDFExecutor().apply(geocodeUDF,
                                                                            create_map(lit("addressLines[0]"), col("ADDRESS"), 
                                                                                       lit("country"), col("COUNTRY"))) \
                                     ).persist()

# Save the Geocode Output inside a HDFS directory
geocodeOutput.select("*", "GEOCODE_OUTPUT.*").drop("GEOCODE_OUTPUT") \
.write.mode("overwrite").options(**{"header":"true"}).format("csv").save("/user/hadoop/addressing/output")

# Extract the required fields from the Geocoded Output
geocodeOutput = geocodeOutput.withColumn("LAT", geocodeOutput["GEOCODE_OUTPUT"].x.cast(DoubleType()))
geocodeOutput = geocodeOutput.withColumn("LON", geocodeOutput["GEOCODE_OUTPUT"].y.cast(DoubleType()))
geocodeOutput = geocodeOutput.withColumn("PB_KEY", geocodeOutput['GEOCODE_OUTPUT'].PB_KEY)
geocodeOutput.select('GEOCODE_OUTPUT').show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------------------------------------------+
|GEOCODE_OUTPUT                                                                                    |
+--------------------------------------------------------------------------------------------------+
|{171 INDUSTRIAL WAY, BRISBANE, CA  94005-1003, -122.406438, 37.698568, P00002T1CTL2, null}        |
|{147 N LIVERMORE AVE STE C, LIVERMORE, CA  94550-3113, -121.770171, 37.683711, P00002T4TIZ5, null}|
|{17 W 19TH ST, NEW YORK, NY  10011-4332, -73.992429, 40.739649, P0000GL13ZK3, null}               |
|{3940 SLUSARIC RD, NORTH TONAWANDA, NY  14120-9558, -78.830092, 43.105778, P0000GL50UPY, null}    |
|{1341 CENTER DR UNIT B, MEDFORD, OR  97501-7947, -122.85291, 42.312725, P0000IVQPBAF, null}       |
+--------------------------------------------------------------------------------------------------+
only showing top 5 rows

### Displaying the Result

In [12]:
# Displaying the result of the Geocode operation. 
geocodeOutput.select('ID', 'ADDRESS', 'COUNTRY', 'LAT', 'LON', 'PB_KEY').show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------------------------------+------------------------+-----------+---------+------------+
|ID      |ADDRESS                                     |COUNTRY                 |LAT        |LON      |PB_KEY      |
+--------+--------------------------------------------+------------------------+-----------+---------+------------+
|16616048|171 INDUSTRIAL WAY,BRISBANE,CA,94005        |UNITED STATES OF AMERICA|-122.406438|37.698568|P00002T1CTL2|
|16617770|147 N LIVERMORE AVE STE C,LIVERMORE,CA,94550|UNITED STATES OF AMERICA|-121.770171|37.683711|P00002T4TIZ5|
|16478264|17 W 19TH ST,NEW YORK,NY,10011              |UNITED STATES OF AMERICA|-73.992429 |40.739649|P0000GL13ZK3|
|16478276|3940 SLUSARIC RD,NORTH TONAWANDA,NY,14120   |UNITED STATES OF AMERICA|-78.830092 |43.105778|P0000GL50UPY|
|16553442|1341 CENTER DR UNIT B,MEDFORD,OR,97501      |UNITED STATES OF AMERICA|-122.85291 |42.312725|P0000IVQPBAF|
+--------+--------------------------------------------+-----------------