In [0]:
#All imports go here
from shapely.geometry import shape
from shapely.ops import transform
from shapely.geometry.multipolygon import MultiPolygon
from shapely import wkt
from shapely.ops import transform

from functools import partial
import pyproj

import pyspark.sql.functions as pys
from pyspark.sql.types import DoubleType, StringType, FloatType, IntegerType, BooleanType
from pyspark.sql.functions import udf

import spark_df_profiling
      

In [0]:
#Location of data
fieldDir = "fields_min/*.json"
growerDir = "/Users/jay_lohokare/Desktop/data/grower_min/*.json"
countriesDir = "country/*.json"
sampleDir = "sample_event/*.txt"

In [0]:
field = sqlContext.read.json(fieldDir)

In [0]:
field = field.select('field.id', 'field.boundary_map.area', 'field.boundary_map.boundary','field.associations.`agrian.grower`', 'field.associations.`agrian.farm`').withColumnRenamed('agrian.farm','farm_id').withColumnRenamed('agrian.grower','grower_id')

In [0]:
#Function to convert MultiPoly to Area in Acre
def getAreaOfMultiPoly(columnElement):
    try:
        s = unicode(str(columnElement), "utf-8")
        p = wkt.loads(s)
        m = MultiPolygon(p)
        
        project = partial(
        pyproj.transform,
        pyproj.Proj(init='epsg:4326'), # source coordinate system
        pyproj.Proj(init='epsg:26913')) # destination coordinate system

        m2 = transform(project, m)
        return m2.area/4046.86
    except:
        return 0
    
#Check if field contains non empty boundary data
def isBoundaryPresent(columnElement):
    try:
        columnElement = s = unicode(str(columnElement), "utf-8")
        if columnElement == "MULTIPOLYGON EMPTY" or columnElement == None or columnElement == "" or columnElement == " ":
            return False
        else:
            return True
    except:
        return False
    
    
#Function to calculate % difference in area calculated and area in JSON
def calculateDifferencePercentageInArea(area, areaCalculated):
    try:
        return (area-areaCalculated)*100/areaCalculated
    except:
        return 0

getAreaOfMultiPoly_udf = udf(lambda z: getAreaOfMultiPoly(z), DoubleType())
getAreaDifferencePercentage = udf(lambda (x,y): calculateDifferencePercentageInArea(float(x),float(y)), DoubleType())
checkIfBoundaryPresent = udf(lambda z: isBoundaryPresent(z), BooleanType())


In [0]:
#Add calculated area and difference % to the dataframe
field = field.withColumn('areaCalculated', getAreaOfMultiPoly_udf(field.boundary))
field = field.withColumn('areaDifferencePercentage', pys.abs(calculateDifferencePercentageInArea(field.area, field.areaCalculated)))
field = field.withColumn('isBoundaryPresent', checkIfBoundaryPresent(field.boundary))
field = field.withColumn("grower_id", pys.explode(field.grower_id))
field = field.withColumn("farm_id", pys.explode(field.farm_id))

In [0]:
field.printSchema()

root
 |-- id: string (nullable = true)
 |-- area: double (nullable = true)
 |-- boundary: string (nullable = true)
 |-- grower_id: string (nullable = true)
 |-- farm_id: string (nullable = true)
 |-- areaCalculated: double (nullable = true)
 |-- areaDifferencePercentage: double (nullable = true)
 |-- isBoundaryPresent: boolean (nullable = true)



In [0]:
countries = sqlContext.read.json(countriesDir)
countries = countries.select('country.id','country.alpha3')


In [0]:
growers = sqlContext.read.json(growerDir)
growers = growers.select('grower.id', 'grower.address.country_id').withColumnRenamed('id','grower_id_temp')

In [0]:
growerToCountryMap = growers.join(countries, growers.country_id==countries.id).drop('country_id', 'id').withColumnRenamed('alpha3','country')
growerToCountryMap.show()
#Free the memory!
growers = None
countries = None

+--------------------+-------+
|      grower_id_temp|country|
+--------------------+-------+
|005310e7-033d-11e...|    USA|
|0025a42a-0339-11e...|    USA|
|005fae69-4843-11e...|    USA|
|002864d1-18f0-11e...|    USA|
|0053ccc1-0cc5-11e...|    USA|
|0008d558-55c8-11e...|    USA|
|0017f369-ae8e-481...|    USA|
|006d943d-e1a5-4a4...|    USA|
|00220330-39e3-4de...|    USA|
|0025d949-339a-4f7...|    USA|
|006f2ddf-8b8e-479...|    USA|
|006b811e-f95c-4bf...|    USA|
|00568de2-7a8d-4b9...|    USA|
|005814d2-474b-460...|    USA|
|000b19ef-610d-40f...|    USA|
|0035d4c6-cce4-4dd...|    USA|
|005c4669-914b-4d1...|    USA|
|00449481-446f-45e...|    USA|
|0037443e-fef7-42a...|    USA|
|0029be28-d83e-444...|    USA|
+--------------------+-------+
only showing top 20 rows



In [0]:
field = field.join(growerToCountryMap, growerToCountryMap.grower_id_temp == field.grower_id, 'left_outer').drop('grower_id_temp')

In [0]:
field.printSchema()
field.show()

root
 |-- id: string (nullable = true)
 |-- area: double (nullable = true)
 |-- boundary: string (nullable = true)
 |-- grower_id: string (nullable = true)
 |-- farm_id: string (nullable = true)
 |-- areaCalculated: double (nullable = true)
 |-- areaDifferencePercentage: double (nullable = true)
 |-- isBoundaryPresent: boolean (nullable = true)
 |-- country: string (nullable = true)

+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+------------------------+-----------------+-------+
|                  id|             area|            boundary|           grower_id|             farm_id|    areaCalculated|areaDifferencePercentage|isBoundaryPresent|country|
+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+------------------------+-----------------+-------+
|000322fc-0da0-4ac...|7.396792705027968|MULTIPOLYGON (((-...|af09ead0-1105-45e...|b591e579-

In [0]:
sample = sqlContext.read.json(sampleDir)
sample = sample.select('sample_event.id', 'sample_event.associations.`agrian.field`', 'sample_event.sample_event_type').toDF('field_id_temp','sample_event_id', 'event_type')



In [0]:
sample = sample.groupBy('field_id_temp').agg(pys.count("event_type").alias("sample_event_count"))

In [0]:
sample.show()

+--------------------+------------------+
|       field_id_temp|sample_event_count|
+--------------------+------------------+
|00458b4d-ae1a-441...|                 1|
|00414ec4-d7ca-431...|                 1|
|00458bc1-c715-465...|                 1|
|00261c83-5935-4ff...|                 1|
|00935acb-6da0-432...|                 1|
|00455f83-08fb-40b...|                 1|
|00414c5c-2bb9-4aa...|                 1|
+--------------------+------------------+



In [0]:
field = field.join(sample, sample.field_id_temp == field.id, 'left_outer').drop('field_id_temp')

In [0]:
field.printSchema()
field.show()

root
 |-- id: string (nullable = true)
 |-- area: double (nullable = true)
 |-- boundary: string (nullable = true)
 |-- grower_id: string (nullable = true)
 |-- farm_id: string (nullable = true)
 |-- areaCalculated: double (nullable = true)
 |-- areaDifferencePercentage: double (nullable = true)
 |-- isBoundaryPresent: boolean (nullable = true)
 |-- country: string (nullable = true)
 |-- sample_event_count: long (nullable = true)

+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+------------------------+-----------------+-------+------------------+
|                  id|             area|            boundary|           grower_id|             farm_id|    areaCalculated|areaDifferencePercentage|isBoundaryPresent|country|sample_event_count|
+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+------------------------+-----------------+-------+------