In [89]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import plotly as py
import plotly.graph_objs as go
import ipywidgets as widgets
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, max, min, col, round, sum, row_number

In [90]:
spark = SparkSession\
    .builder\
    .appName("Python Spark SQL basic example")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

In [4]:
df_crop_temp = spark.read.csv("Production_Crops_E_All_Data_(Normalized).csv")
df_nutrient_temp = spark.read.csv("Inputs_FertilizersNutrient_E_All_Data_(Normalized).csv")
print(df_crop_temp.head())
print(df_nutrient_temp.head())

In [60]:
df_crop_temp = df_crop_temp.drop('_c10', '_c6', '_c4', '_c5')
df_nutrient_temp = df_nutrient_temp.drop('_c10', '_c6', '_c4', '_c1')
df_crop_temp = df_crop_temp.withColumn("_c9", df_crop_temp["_c9"].cast(IntegerType()))
df_crop_temp = df_crop_temp.withColumn("_c7", df_crop_temp["_c7"].cast(IntegerType()))
df_nutrient_temp = df_nutrient_temp.withColumn("_c9", df_nutrient_temp["_c9"].cast(IntegerType()))
df_nutrient_temp = df_nutrient_temp.withColumn("_c7", df_nutrient_temp["_c7"].cast(IntegerType()))
df_crop_temp = df_crop_temp.filter(df_crop_temp._c9 > 0)

In [61]:
df_plot = df_crop_temp.filter(df_crop_temp._c8 == 'hg/ha')

In [62]:
df_crop = df_crop_temp.selectExpr("_c0 as country_id", "_c1 as country", "_c2 as crop_id", "_c3 as crop", "_c7 as year", "_c8 as unit", "_c9 as yield")
df_nutrient = df_nutrient_temp.selectExpr("_c0 as n_country_id","_c2 as nutrient_id","_c3 as nutrient","_c5 as element","_c7 as n_year","_c8 as n_unit", "_c9 as quantity")
df_nutrient = df_nutrient.filter(df_nutrient.element == 'Agricultural Use')
df_crop.printSchema()

root
 |-- country_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- crop_id: string (nullable = true)
 |-- crop: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- yield: integer (nullable = true)



In [63]:
df_land = df_crop.filter(df_crop.unit == 'ha')
df_land = df_land.filter(df_land.year > 1999)
df_land = df_land.select("country_id", "country", "year", "yield")
w6 = Window.partitionBy(df_land.country, df_land.year).orderBy(df_land.country,  df_land.year)
df_land = df_land.select("country_id", "country", "year", sum('yield').over(w6).alias('yield')).distinct()
#w6 = Window.partitionBy(df_land.country, df_land.crop).orderBy(df_land.country,  df_land.crop)
#df_land = df_land.select("country_id", "country", "crop", avg('yield').over(w6).alias('yield')).distinct()
#df_land = df_land.select("country_id", "country", "crop", round('yield').alias('yield'))
#w7 = Window.partitionBy(df_land.country).orderBy(df_land.country)
#df_land = df_land.select("country_id", "country", sum('yield').over(w7).alias('yield')).distinct()
df_land.count()

4181

In [64]:
df_land.show(3)

+----------+----------+----+--------+
|country_id|   country|year|   yield|
+----------+----------+----+--------+
|         9| Argentina|2010|52567628|
|        52|Azerbaijan|2012| 3940675|
|        13|   Bahrain|2014|    5785|
+----------+----------+----+--------+
only showing top 3 rows



In [65]:
df_land_nutrient = df_land.join(df_nutrient, [df_land.country_id == df_nutrient.n_country_id, df_land.year == df_nutrient.n_year]).drop('n_country_id','element','n_year','n_unit')
df_land_nutrient = df_land_nutrient.withColumn("tonnes/ha", col('quantity') / col('yield'))
df_land_nutrient = df_land_nutrient.select("country_id", "country", "year", "nutrient_id", "nutrient", (col("tonnes/ha")*1000).alias("kg/ha"))
df_land_nutrient = df_land_nutrient.select("country_id", "country", "year", "nutrient_id", "nutrient", round("kg/ha", 6).alias("kg/ha"))
df_land_nutrient = df_land_nutrient.select(col("country_id").alias("n_country_id"), col("country").alias("n_country"), col("year").alias("n_year"), "nutrient_id", "nutrient", "kg/ha")
df_land_nutrient.show()

+------------+--------------------+------+-----------+--------------------+----------+
|n_country_id|           n_country|n_year|nutrient_id|            nutrient|     kg/ha|
+------------+--------------------+------+-----------+--------------------+----------+
|           9|           Argentina|  2010|       3104|Nutrient potash K...|  0.631834|
|           9|           Argentina|  2010|       3103|Nutrient phosphat...| 13.005476|
|           9|           Argentina|  2010|       3102|Nutrient nitrogen...| 14.671482|
|          52|          Azerbaijan|  2012|       3104|Nutrient potash K...|  0.655218|
|          52|          Azerbaijan|  2012|       3103|Nutrient phosphat...|  1.538061|
|          52|          Azerbaijan|  2012|       3102|Nutrient nitrogen...|  6.988904|
|          13|             Bahrain|  2014|       3104|Nutrient potash K...|233.362143|
|          13|             Bahrain|  2014|       3103|Nutrient phosphat...| 25.929127|
|          13|             Bahrain|  2014| 

In [66]:
df_land_nutrient_3104 = df_land_nutrient.select(col('n_country_id').alias('cid_3104'),col('n_country').alias('c_3104'),col('n_year').alias('y_3104'),col('nutrient_id').alias('nid_3104'),col('nutrient').alias('n_3104'),col('kg/ha').alias('potash')).where(col('nutrient_id') == '3104')
df_land_nutrient_3104.show()

+--------+--------------------+------+--------+--------------------+----------+
|cid_3104|              c_3104|y_3104|nid_3104|              n_3104|    potash|
+--------+--------------------+------+--------+--------------------+----------+
|       9|           Argentina|  2010|    3104|Nutrient potash K...|  0.631834|
|      52|          Azerbaijan|  2012|    3104|Nutrient potash K...|  0.655218|
|      13|             Bahrain|  2014|    3104|Nutrient potash K...|233.362143|
|      44|            Colombia|  2014|    3104|Nutrient potash K...| 41.311986|
|      49|                Cuba|  2009|    3104|Nutrient potash K...|  5.517703|
|     102|Iran (Islamic Rep...|  2016|    3104|Nutrient potash K...|  2.323253|
|     108|          Kazakhstan|  2007|    3104|Nutrient potash K...|  0.018226|
|     256|          Luxembourg|  2007|    3104|Nutrient potash K...| 16.774544|
|    5102|       Middle Africa|  2003|    3104|Nutrient potash K...|  0.515913|
|    5102|       Middle Africa|  2006|  

In [67]:
df_land_nutrient_3103 = df_land_nutrient.select(col('n_country_id').alias('cid_3103'),col('n_country').alias('c_3103'),col('n_year').alias('y_3103'),col('nutrient_id').alias('nid_3103'),col('nutrient').alias('n_3103'),col('kg/ha').alias('phosphate')).where(col('nutrient_id') == '3103')
df_land_nutrient_3103.show()

+--------+--------------------+------+--------+--------------------+---------+
|cid_3103|              c_3103|y_3103|nid_3103|              n_3103|phosphate|
+--------+--------------------+------+--------+--------------------+---------+
|       9|           Argentina|  2010|    3103|Nutrient phosphat...|13.005476|
|      52|          Azerbaijan|  2012|    3103|Nutrient phosphat...| 1.538061|
|      13|             Bahrain|  2014|    3103|Nutrient phosphat...|25.929127|
|      44|            Colombia|  2014|    3103|Nutrient phosphat...|27.208704|
|      49|                Cuba|  2009|    3103|Nutrient phosphat...| 2.785058|
|     102|Iran (Islamic Rep...|  2016|    3103|Nutrient phosphat...| 2.588822|
|     108|          Kazakhstan|  2007|    3103|Nutrient phosphat...| 0.415138|
|     256|          Luxembourg|  2007|    3103|Nutrient phosphat...|15.353279|
|    5102|       Middle Africa|  2003|    3103|Nutrient phosphat...| 0.329508|
|    5102|       Middle Africa|  2006|    3103|Nutri

In [68]:
df_land_nutrient_3102 = df_land_nutrient.select(col('n_country_id').alias('cid_3102'),col('n_country').alias('c_3102'),col('n_year').alias('y_3102'),col('nutrient_id').alias('nid_3102'),col('nutrient').alias('n_3102'),col('kg/ha').alias('nitrogen')).where(col('nutrient_id') == '3102')
df_land_nutrient_3102.show()

+--------+--------------------+------+--------+--------------------+----------+
|cid_3102|              c_3102|y_3102|nid_3102|              n_3102|  nitrogen|
+--------+--------------------+------+--------+--------------------+----------+
|       9|           Argentina|  2010|    3102|Nutrient nitrogen...| 14.671482|
|      52|          Azerbaijan|  2012|    3102|Nutrient nitrogen...|  6.988904|
|      13|             Bahrain|  2014|    3102|Nutrient nitrogen...|105.445117|
|      44|            Colombia|  2014|    3102|Nutrient nitrogen...| 76.433447|
|      49|                Cuba|  2009|    3102|Nutrient nitrogen...|  8.431035|
|     102|Iran (Islamic Rep...|  2016|    3102|Nutrient nitrogen...| 29.952593|
|     108|          Kazakhstan|  2007|    3102|Nutrient nitrogen...|  0.759399|
|     256|          Luxembourg|  2007|    3102|Nutrient nitrogen...|120.508758|
|    5102|       Middle Africa|  2003|    3102|Nutrient nitrogen...|  0.539996|
|    5102|       Middle Africa|  2006|  

In [69]:
df_crop = df_crop.filter(df_crop.unit == 'hg/ha')
df_crop = df_crop.filter(df_crop.year > 1999)
df_nutrient = df_nutrient.filter(df_nutrient.n_year > 1999)

In [70]:
w1 =  Window.partitionBy(df_crop.country, df_crop.crop).orderBy(df_crop.country, df_crop.crop)
#df_crop_avg = df_crop.select("country_id", "country", "crop_id", "crop", "unit", avg("yield").over(w1).alias('yield'))
#df_crop_avg = df_crop_avg.select("country_id", "country", "crop_id", "crop", "unit", round("yield").alias('yield')).distinct()
#df_crop_avg.count()
df_crop_max = df_crop.withColumn('max_value', max("yield").over(w1)).where(col("yield") == col('max_value')).drop('max_value')
#df_crop_max.show()

In [71]:
w2 = Window.partitionBy(df_crop_max.crop)
df_crop_max = df_crop_max.withColumn('max_value', max("yield").over(w2)).where(col("yield") == col('max_value')).drop('max_value')
#w2 = Window.partitionBy(df_crop_avg.crop)
#df_crop_max = df_crop_avg.withColumn('max_value', max("yield").over(w2)).where(col("yield") == col('max_value')).drop('max_value')
df_crop_max.orderBy('crop').show(500)

+----------+--------------------+-------+--------------------+----+-----+----------+
|country_id|             country|crop_id|                crop|year| unit|     yield|
+----------+--------------------+-------+--------------------+----+-----+----------+
|        58|             Ecuador|    800|    Agave fibres nes|2004|hg/ha|     18021|
|       105|              Israel|    221| Almonds, with shell|2013|hg/ha|    402844|
|       102|Iran (Islamic Rep...|    711|Anise, badian, fe...|2013|hg/ha|    117844|
|       211|         Switzerland|    515|              Apples|2011|hg/ha|    782183|
|       198|            Slovenia|    526|            Apricots|2004|hg/ha|    243548|
|       131|            Malaysia|    226|          Areca nuts|2013|hg/ha|     86000|
|      5101|      Eastern Africa|    366|          Artichokes|2002|hg/ha|    585157|
|       102|Iran (Islamic Rep...|    367|           Asparagus|2013|hg/ha|    238605|
|      5303|       Southern Asia|    367|           Asparagus|201

In [72]:
w7 = Window.partitionBy(df_crop_max.crop).orderBy(df_crop_max.crop)
df_crop_max = df_crop_max.withColumn('rn', row_number().over(w7)).where(col('rn') == 1).drop('rn')
df_crop_max.count()

167

In [73]:
w3 =  Window.partitionBy(df_crop.country, df_crop.crop).orderBy(df_crop.country)
df_crop_min = df_crop.withColumn('min_value', min("yield").over(w3)).where(col("yield") == col('min_value')).drop('min_value')
w4 = Window.partitionBy(df_crop_min.crop)
df_crop_min = df_crop_min.withColumn('min_value', min("yield").over(w4)).where(col("yield") == col('min_value')).drop('min_value')
#w4 = Window.partitionBy(df_crop_avg.crop)
#df_crop_min = df_crop_avg.withColumn('min_value', min("yield").over(w4)).where(col("yield") == col('min_value')).drop('min_value')
df_crop_min.show()

+----------+--------------------+-------+--------------------+----+-----+-----+
|country_id|             country|crop_id|                crop|year| unit|yield|
+----------+--------------------+-------+--------------------+----+-----+-----+
|       153|       New Caledonia|   1720|Roots and Tubers,...|2013|hg/ha|12705|
|      5304|  South-Eastern Asia|    101|         Canary seed|2006|hg/ha| 3320|
|       216|            Thailand|    101|         Canary seed|2006|hg/ha| 3320|
|      5300|                Asia|    305|     Tallowtree seed|2001|hg/ha|23194|
|      5302|        Eastern Asia|    305|     Tallowtree seed|2001|hg/ha|23194|
|        41|     China, mainland|    305|     Tallowtree seed|2001|hg/ha|23194|
|      5000|               World|    305|     Tallowtree seed|2001|hg/ha|23194|
|       351|               China|    305|     Tallowtree seed|2001|hg/ha|23194|
|       105|              Israel|    574|          Pineapples|2016|hg/ha|20000|
|      5305|        Western Asia|    574

In [74]:
w8 = Window.partitionBy(df_crop_min.crop).orderBy(df_crop_min.crop)
df_crop_min = df_crop_min.withColumn('rn', row_number().over(w8)).where(col('rn') == 1).drop('rn')
df_crop_min.count()

167

In [75]:
#w5 =  Window.partitionBy(df_nutrient.n_country_id, df_nutrient.nutrient).orderBy(df_nutrient.n_country_id, df_nutrient.nutrient)
#df_nutrient_max = df_nutrient.select("n_country_id","nutrient_id","nutrient","n_unit", avg("quantity").over(w5).alias('quantity'))
#df_nutrient_max = df_nutrient_max.select("n_country_id","nutrient_id","nutrient","n_unit", round("quantity").alias('quantity')).distinct()
#df_nutrient_max.show()

In [76]:
#df_land_nutrient = df_land.join(df_nutrient_max, df_land.country_id == df_nutrient_max.n_country_id).drop('n_country_id')
#df_land_nutrient = df_land_nutrient.withColumn("tonnes/ha", col('quantity') / col('yield'))
#df_land_nutrient.show()

In [77]:
df_crop_nutrient_max = df_crop_max.join(df_land_nutrient_3102, [df_crop_max.country_id == df_land_nutrient_3102.cid_3102, df_crop_max.year == df_land_nutrient_3102.y_3102])
df_crop_nutrient_max = df_crop_nutrient_max.drop('cid_3102','c_3102','y_3102')
#df_crop_nutrient_max = df_crop_nutrient_max.drop('n_country_id','element','n_year')
#df_crop_nutrient_max = df_crop_max.join(df_nutrient_max, df_crop_max.country_id == df_nutrient_max.n_country_id)
#df_crop_nutrient_max = df_crop_nutrient_max.drop('n_country_id','element')
df_crop_nutrient_max.orderBy("crop", "country").show()

+----------+--------------------+-------+--------------------+----+-----+-------+--------+--------------------+-----------+
|country_id|             country|crop_id|                crop|year| unit|  yield|nid_3102|              n_3102|   nitrogen|
+----------+--------------------+-------+--------------------+----+-----+-------+--------+--------------------+-----------+
|        58|             Ecuador|    800|    Agave fibres nes|2004|hg/ha|  18021|    3102|Nutrient nitrogen...|  27.010363|
|       105|              Israel|    221| Almonds, with shell|2013|hg/ha| 402844|    3102|Nutrient nitrogen...|  75.282349|
|       102|Iran (Islamic Rep...|    711|Anise, badian, fe...|2013|hg/ha| 117844|    3102|Nutrient nitrogen...|  38.261166|
|       211|         Switzerland|    515|              Apples|2011|hg/ha| 782183|    3102|Nutrient nitrogen...|  74.206924|
|       198|            Slovenia|    526|            Apricots|2004|hg/ha| 243548|    3102|Nutrient nitrogen...|  70.235046|
|       

In [78]:
df_crop_nutrient_max = df_crop_nutrient_max.join(df_land_nutrient_3103, [df_crop_max.country_id == df_land_nutrient_3103.cid_3103, df_crop_max.year == df_land_nutrient_3103.y_3103])
df_crop_nutrient_max = df_crop_nutrient_max.drop('cid_3103','c_3103','y_3103')

In [79]:
df_crop_nutrient_max = df_crop_nutrient_max.join(df_land_nutrient_3104, [df_crop_nutrient_max.country_id == df_land_nutrient_3104.cid_3104, df_crop_nutrient_max.year == df_land_nutrient_3104.y_3104])
df_crop_nutrient_max = df_crop_nutrient_max.drop('cid_3104','c_3104','y_3104')
df_crop_nutrient_max.drop('country_id','crop_id','unit','nid_3102','n_3102','nid_3103','n_3103','nid_3104','n_3104').orderBy("crop", "country").show()

+--------------------+--------------------+----+-------+-----------+-----------+----------+
|             country|                crop|year|  yield|   nitrogen|  phosphate|    potash|
+--------------------+--------------------+----+-------+-----------+-----------+----------+
|             Ecuador|    Agave fibres nes|2004|  18021|  27.010363|   8.637892| 19.671093|
|              Israel| Almonds, with shell|2013| 402844|  75.282349|  12.359789| 48.155021|
|Iran (Islamic Rep...|Anise, badian, fe...|2013| 117844|  38.261166|   9.802142|  6.526011|
|         Switzerland|              Apples|2011| 782183|  74.206924|  26.069989| 34.268971|
|            Slovenia|            Apricots|2004| 243548|  70.235046|     33.489| 44.204449|
|            Malaysia|          Areca nuts|2013|  86000|  29.863852|  34.570525| 95.389157|
|      Eastern Africa|          Artichokes|2002| 585157|   3.076379|   1.896029|  0.474415|
|       Southern Asia|           Asparagus|2013| 238605|  36.899091|  12.037407|

In [80]:
df_crop_nutrient_min = df_crop_min.join(df_land_nutrient_3102, [df_crop_min.country_id == df_land_nutrient_3102.cid_3102, df_crop_min.year == df_land_nutrient_3102.y_3102])
df_crop_nutrient_min = df_crop_nutrient_min.drop('cid_3102','c_3102','y_3102')
#df_crop_nutrient_min = df_crop_nutrient_min.drop('n_country_id','element','n_year')
#df_crop_nutrient_min = df_crop_min.join(df_nutrient_max, df_crop_min.country_id == df_nutrient_max.n_country_id)
#df_crop_nutrient_min = df_crop_nutrient_min.drop('n_country_id','element')
#df_crop_nutrient_min.orderBy("crop", "country").show()

In [81]:
df_crop_nutrient_min = df_crop_nutrient_min.join(df_land_nutrient_3103, [df_crop_nutrient_min.country_id == df_land_nutrient_3103.cid_3103, df_crop_nutrient_min.year == df_land_nutrient_3103.y_3103])
df_crop_nutrient_min = df_crop_nutrient_min.drop('cid_3103','c_3103','y_3103')

In [82]:
df_crop_nutrient_min = df_crop_nutrient_min.join(df_land_nutrient_3104, [df_crop_nutrient_min.country_id == df_land_nutrient_3104.cid_3104, df_crop_nutrient_min.year == df_land_nutrient_3104.y_3104])
df_crop_nutrient_min = df_crop_nutrient_min.drop('cid_3104','c_3104','y_3104')
df_crop_nutrient_min.drop('country_id','crop_id','unit','nid_3102','n_3102','nid_3103','n_3103','nid_3104','n_3104').orderBy("crop", "country").show()

+-------------------+--------------------+----+-----+----------+---------+---------+
|            country|                crop|year|yield|  nitrogen|phosphate|   potash|
+-------------------+--------------------+----+-----+----------+---------+---------+
|               Cuba|    Agave fibres nes|2015| 3697| 33.699306|15.942967|19.866529|
|          Lithuania|Anise, badian, fe...|2011|  859| 12.464466|16.667396|11.014592|
|            Estonia|              Apples|2004| 2427| 22.980326| 6.866429|10.668875|
|             Poland|            Apricots|2007| 6569| 32.934033|13.328726|15.493959|
|          Indonesia|          Areca nuts|2016| 4134| 33.914374| 9.590186|18.707658|
|        Switzerland|          Artichokes|2004| 3333| 66.310133|23.670624|37.223545|
|            Finland|           Asparagus|2005| 1667| 40.722843|  6.30175| 16.48098|
|         Costa Rica|            Avocados|2013| 5991|118.151222| 22.71053|38.829163|
|              Niger|       Bambara beans|2003| 2574|  0.062192| 

In [83]:
sqlCtx.registerDataFrameAsTable(df_land, "final_df")
df_temp = sqlCtx.sql("SELECT * from final_df where country='Afghanistan' order by year")
df_temp.show()

+----------+-----------+----+--------+
|country_id|    country|year|   yield|
+----------+-----------+----+--------+
|         2|Afghanistan|2000| 7989102|
|         2|Afghanistan|2001| 6997246|
|         2|Afghanistan|2002| 7591284|
|         2|Afghanistan|2003|10067943|
|         2|Afghanistan|2004| 9081042|
|         2|Afghanistan|2005|10233784|
|         2|Afghanistan|2006| 9930715|
|         2|Afghanistan|2007|10040900|
|         2|Afghanistan|2008| 9127392|
|         2|Afghanistan|2009|10619890|
|         2|Afghanistan|2010| 9943679|
|         2|Afghanistan|2011| 9447785|
|         2|Afghanistan|2012|10486763|
|         2|Afghanistan|2013|10583401|
|         2|Afghanistan|2014|11210735|
|         2|Afghanistan|2015| 9484706|
|         2|Afghanistan|2016| 9813082|
+----------+-----------+----+--------+



In [84]:
crop_df = df_plot.select('_c2','_c3').distinct().collect()
crop = []
for i in crop_df:
    crop.append(i[1])
crop = sorted(crop)

In [85]:
country_df = df_plot.select('_c0', '_c1').distinct().collect()
country = []
for i in country_df:
    country.append(i[1])
country = sorted(country)

In [86]:
py.offline.init_notebook_mode(connected=True)
layout = go.Layout(title='Crop Production', yaxis = dict(title='Yield(hg/ha)'), xaxis = dict(title='Year'))

In [87]:
def update_plot(countries, crops):
    data = []
    for i in countries:
        temp = df_plot.filter(df_plot._c1 == i)
        temp = temp.filter(temp._c3 == crops)
        x = temp.toPandas()['_c7']
        y = temp.toPandas()['_c9']
        trace = go.Scatter(x=x, y=y, mode='lines', name=i)
        data.append(trace)
    fig = go.Figure(data=data, layout=layout)
    py.offline.iplot(fig)

countries = widgets.SelectMultiple(options = country, value = (country[0], ), description = 'country')
crops = widgets.Select(options = crop, value = crop[0], description = 'crop')
widgets.interactive(update_plot, countries = countries, crops = crops)

A Jupyter Widget

In [88]:
spark.stop()