## Exploring correlations between weather and crop yield

In [1]:
import urllib.request
import zipfile
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, regexp_replace, col
from meteostat import Stations

In [2]:
spark = SparkSession.builder.appName("FAOSTAT Data Correlations").getOrCreate()

In [3]:
file_url = "https://bulks-faostat.fao.org/production/Production_Crops_Livestock_E_All_Data.zip"
local_zip_path = "Production_Crops_Livestock_E_All_Data.zip"
urllib.request.urlretrieve(file_url, local_zip_path)

('Production_Crops_Livestock_E_All_Data.zip',
 <http.client.HTTPMessage at 0x1f874ad0bf0>)

In [4]:
with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
    zip_ref.extractall("/tmp/data")

In [5]:
csv_file_path = "/tmp/data/Production_Crops_Livestock_E_All_Data.csv"
df = spark.read.csv(csv_file_path, inferSchema=True, header=True)

### Explore one country

- First let's see the weather/crop yield for one country, in this case **Mexico**
- I am going to transform the data in a way that we have one **Year** column along with **Yield**, **Year Flag** and **Year Note**
- Although it will significantly increase the dataframe size, I believe it will be easier to use alongside other datasets, since we will be able to group by the Year column as well, rather than having to search by each year column separately

In [6]:
df = df.withColumn("Id", monotonically_increasing_id())

In [7]:
columns = ['Id', 'Area Code', 'Area Code (M49)', 'Area', 'Item Code', 'Item Code (CPC)', 
           'Item', 'Element Code', 'Element', 'Unit']

In [8]:
year_columns = [col for col in df.columns if col.startswith('Y') and not col.endswith(('F', 'N'))]
flag_columns = [col for col in df.columns if col.endswith('F')]
note_columns = [col for col in df.columns if col.endswith('N')]

In [9]:
df_transform_year = df.melt(
    ids=[col for col in df.columns if col not in year_columns + list(flag_columns) + list(note_columns)], 
    values=year_columns,
    variableColumnName="Year", 
    valueColumnName="Yield"
)
df_transform_year = df_transform_year.withColumn("Final Year", regexp_replace(col("Year"), "Y", "").cast("int"))
df_transform_year = df_transform_year.drop("Year").withColumnRenamed("Final Year", "Year")
df_transform_year.show()

+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+------+----+
|Area Code|Area Code (M49)|       Area|Item Code|Item Code (CPC)|             Item|Element Code|       Element|Unit| Id| Yield|Year|
+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+------+----+
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|   0.0|1961|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|   0.0|1962|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|   0.0|1963|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|   0.0|1964|
|        2|           '004|Afghanistan|      221|         '01371|Almo

In [10]:
df_transform_flag = df.melt(
    ids=[col for col in df.columns if col not in year_columns + flag_columns + note_columns],
    values=flag_columns,
    variableColumnName="Year",
    valueColumnName="Year Flag"
)
df_transform_flag = df_transform_flag.withColumn("Final Year", regexp_replace(col("Year"), "Y|F", "").cast("int"))
df_transform_flag = df_transform_flag.drop("Year").withColumnRenamed("Final Year", "Year")
df_transform_flag.show()

+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+---------+----+
|Area Code|Area Code (M49)|       Area|Item Code|Item Code (CPC)|             Item|Element Code|       Element|Unit| Id|Year Flag|Year|
+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+---------+----+
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|        A|1961|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|        A|1962|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|        A|1963|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|        A|1964|
|        2|           '004|Afghanistan|      221

In [11]:
df_transform_note = df.melt(
    ids=[col for col in df.columns if col not in year_columns + flag_columns + note_columns],
    values=note_columns,
    variableColumnName="Year",
    valueColumnName="Year Note"
)
df_transform_note = df_transform_note.withColumn("Final Year", regexp_replace(col("Year"), "Y|N", "").cast("int"))
df_transform_note = df_transform_note.drop("Year").withColumnRenamed("Final Year", "Year")
df_transform_note.show()

+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+---------+----+
|Area Code|Area Code (M49)|       Area|Item Code|Item Code (CPC)|             Item|Element Code|       Element|Unit| Id|Year Note|Year|
+---------+---------------+-----------+---------+---------------+-----------------+------------+--------------+----+---+---------+----+
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|     NULL|1961|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|     NULL|1962|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|     NULL|1963|
|        2|           '004|Afghanistan|      221|         '01371|Almonds, in shell|        5312|Area harvested|  ha|  0|     NULL|1964|
|        2|           '004|Afghanistan|      221

In [12]:
df_combined = df_transform_year \
    .join(df_transform_flag, on=[
        "Id", "Area Code", "Area Code (M49)", "Area", "Item Code", "Item Code (CPC)", "Item", 
        "Element Code", "Element", "Unit", "Year"], how="left") \
    .join(df_transform_note, on=[
        "Id", "Area Code", "Area Code (M49)", "Area", "Item Code", "Item Code (CPC)", "Item", 
        "Element Code", "Element", "Unit", "Year"], how="left")

df_combined.columns

['Id',
 'Area Code',
 'Area Code (M49)',
 'Area',
 'Item Code',
 'Item Code (CPC)',
 'Item',
 'Element Code',
 'Element',
 'Unit',
 'Year',
 'Yield',
 'Year Flag',
 'Year Note']