In [3]:
# Required packages and libraries
import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
from pyspark.sql.types import *
import warnings

#filter out warnings
warnings.filterwarnings("ignore")

In [4]:
# Initalize spark session
spark = SparkSession.builder \
    .appName("F-A-O") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


In [5]:
# Set the maxToStringFields property
spark.conf.set("spark.sql.debug.maxToStringFields", "100")

In [6]:
# Get the dataset
prod_data = spark\
            .read\
            .options(inferSchema="true", header="true")\
            .csv("Production_Crops_Livestock_E_Africa.csv")

There are columns that do not add direct context to my analysis. I will drop them.

In [7]:
# Drop columns

# drop flag cols
to_drop = [col for col in prod_data.columns if col.endswith("F")]

# drop unnecessary cols
to_drop1 = ['Area Code','Area Code (M49)','Item Code','Item Code (CPC)','Element','Element Code','Unit']

# implement
prod_data = (prod_data\
            .drop(*to_drop, *to_drop1))

In [8]:
# Rename columns
#rename year cols
renamed_cols = [col.replace('Y', '') for col in prod_data.columns]
prod_data = prod_data.toDF(*renamed_cols)

#rename other cols
prod_data = prod_data\
                .withColumnRenamed("Area", "Country")


In [9]:
# Trim column names
prod_data = prod_data.select([col(name).alias(name.strip()) for name in prod_data.columns])

root
 |-- Country: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (nullable = true)
 |-- 1983: double (nullable = true)
 |-- 1984: double (nullable = true)
 |-- 1985: double (nullable = true)
 |-- 1986: double (n

In [10]:
# Melt dataframe
#convert spark to pandas df
prod_data = ps.DataFrame(prod_data)

In [11]:
#melt the pandas df
keep_columns=['Country', 'Item']
prod_data=prod_data.melt(id_vars=keep_columns, var_name='Year',value_name='Weight')
prod_data.head()

                                                                                

Unnamed: 0,Country,Item,Year,Weight
0,Algeria,"Almonds, in shell",1961,13300.0
1,Algeria,"Almonds, in shell",1962,13300.0
2,Algeria,"Almonds, in shell",1963,13300.0
3,Algeria,"Almonds, in shell",1964,14200.0
4,Algeria,"Almonds, in shell",1965,13800.0


I converted the dataframe to pandas dataframe in order to melt it. I could not get along with unpivoting the dataframe with pyspark. If you know how, kindly reach out.
Now I'll have to covert the dataframe back to spark DF for further processing.

In [12]:
# Convert pandas dataframe to RDD 
r_dd = spark.sparkContext.parallelize(prod_data.values.tolist())

#Schema for the pyspark dataFrame
schema = StructType([
    StructField("Country", StringType(), nullable=False),
    StructField("Item", StringType(), nullable=False),
    StructField("Year", StringType(), nullable=False),
    StructField("Weight", DoubleType(), nullable=False)
])

#create pyspark dataFrame
prod_data = spark.createDataFrame(r_dd, schema=schema)

In [13]:
# Create new column
prod_data = prod_data.withColumn("Category",lit(""))

In [16]:
# Categorize items
grains=['Wheat and products', 'Rice (Milled Equivalent)','Barley and products','Maize and products',
        'Millet and products','Cereals, Other','Cereals - Excluding Beer','Rye and products','Oats',
        'Sorghum and products','Cassava and products','Cereals n.e.c.','Wheat']
vegetables=['Potatoes and products','Vegetables, Other','Starchy Roots','Vegetables','Sweet potatoes','Roots, Other',
            'Onions','Plantains','Pimento','Aquatic Plants','Yams','Potatoes','Roots and Tubers, Total',
           'Eggplants (aubergines)','Cauliflowers and broccoli','Edible roots and tubers with high starch or inulin content, n.e.c., fresh',
           'Mushrooms and truffles','Other vegetables, fresh n.e.c.']
sugars=['Sugar (Raw Equivalent)','Sweeteners, Other','Sugar Crops','Sugar & Sweeteners','Sugar non-centrifugal',
        'Sugar beet','Sugar cane','Molasses']
fruits=['Olives-Including Preserved','Tomatoes and products','Oranges, Mandarines','Citrus, Other','Bananas',
        'Apples and products','Pineapples and products','Dates','Grapes and Products-Excluding Wine','Fruits, Other',
        'Fruits - Excluding Wine','Lemons, Limes and products','Grapefruit and products','Pineapples',
        'Plums and sloes','Watermelons']
legumes=['Pulses, Other and products','Coffee and products','Cocoa Beans and products','Pulses','Beans','Peas',
         'Soyabeans','Groundnuts-Shelled Eq','Cocoa beans','Broad beans and horse beans, green','Other beans, green',
        '|Lentils, dry','Okra','Cashew nuts, in shell']
seeds=['Sesame seed', 'Rape and Mustardseed','Palm kernels','Sunflower seed','Cottonseed','Castor oil seeds',
      'Rape or colza seed']
nuts=['Nuts and products','Almonds, in shell','Coconuts - Incl Copra','Treenuts']
oils_fats=['Soyabean Oil','Groundnut Oil','Sunflowerseed Oil','Rape and Mustard Oil','Cottonseed Oil','Palm Oil',
           'Sesameseed Oil','Olive Oil','Oilcrops Oil, Other','Oilcrops','Vegetable Oils','Oilcrops, Other',
           'Maize Germ Oil','Coconut Oil','Palmkernel Oil','Ricebran Oil','Fish, Body Oil','Fish, Liver Oil',
           'Animal fats','Fats, Animals, Raw','Olive oil','Rapeseed or canola oil, crude',
           'Olives','Oil of palm kernel','Palm oil']
beverages=['Tea-Including Mate','Wine','Beer','Beverages, Alcoholic','Alcoholic Beverages','Beverages, Fermented',
           'Coffee, green']
spices=['Pepper','Spices, Other','Spices','Cloves','Green garlic']
meat=['Bovine Meat','Mutton & Goat Meat','Meat, Other','Meat','Pigmeat']
sea_food=['Freshwater Fish','Fish, Seafood','Demersal Fish','Pelagic Fish','Marine Fish, Other','Crustaceans',
          'Cephalopods','Molluscs, Other','Aquatic Animals, Others','Aquatic Products, Other',
          'Meat, Aquatic Mammals']
dairy=['Butter, Ghee','Cream','Milk - Excluding Butter','Infant food','Butter of cow milk']
poultry=['Eggs','Poultry Meat','Turkeys','Chickens','Hen eggs in shell, fresh']
other_animal_products=['Honey','Bees','Offals','Offals, Edible','Edible offals of horses and other equines,  fresh, chilled or frozen',
                      'Raw hides and skins of sheep or lambs','Other meat n.e.c. (excluding mammals), fresh, chilled or frozen',
                      'Edible offal of cattle, fresh, chilled or frozen']
miscellaneous=['Stimulants','Miscellaneous']
other_plant_products=['Flax, processed but not spun','Unmanufactured tobacco','Sisal, raw']

# Pattern matching with regex
grains_regex = '|'.join(grains)
vegetables_regex = '|'.join(vegetables)
sugars_regex= '|'.join(sugars)
fruits_regex = '|'.join(fruits)
legumes_regex = '|'.join(legumes)
seeds_regex = '|'.join(seeds)
nuts_regex = '|'.join(nuts)
oils_fats_regex = '|'.join(oils_fats)
beverages_regex = '|'.join(beverages)
spices_regex = '|'.join(spices)
meat_regex = '|'.join(meat)
sea_food_regex = '|'.join(sea_food)
dairy_regex = '|'.join(dairy)
poultry_regex = '|'.join(poultry)
other_animal_products_regex = '|'.join(other_animal_products)
miscellaneous_regex = '|'.join(miscellaneous)

# Update "Category" column based on the values in "Item" column
prod_data = prod_data.withColumn("Category",
                                 when(col("Item").rlike(grains_regex), lit("Grains"))
                                 .when(col("Item").rlike(vegetables_regex), lit("Vegetables"))
                                 .when(col("Item").rlike(sugars_regex), lit("Sugar"))
                                 .when(col("Item").rlike(fruits_regex), lit("Fruits"))
                                 .when(col("Item").rlike(legumes_regex), lit("Legumes"))
                                 .when(col("Item").rlike(seeds_regex), lit("Seeds"))
                                 .when(col("Item").rlike(nuts_regex), lit("Nuts"))
                                 .when(col("Item").rlike(oils_fats_regex), lit("Oil_Fats"))
                                 .when(col("Item").rlike(beverages_regex), lit("Beverages"))
                                 .when(col("Item").rlike(spices_regex), lit("Spices"))
                                 .when(col("Item").rlike(meat_regex), lit("Meat"))
                                 .when(col("Item").rlike(sea_food_regex), lit("Sea_Food"))
                                 .when(col("Item").rlike(dairy_regex), lit("Dairy"))
                                 .when(col("Item").rlike(poultry_regex), lit("Poultry"))
                                 .when(col("Item").rlike(other_animal_products_regex), lit("Other_Animal_Products"))
                                 .when(col("Item").rlike(miscellaneous_regex), lit("Miscellaneous"))
                                 .otherwise(col("Category")))

In [18]:
item_counts = prod_data.filter(col("Category") == "").groupBy("Item").count()
item_counts.show(100, False)

23/06/05 14:33:52 WARN TaskSetManager: Stage 10 contains a task of very large size (2928 KiB). The maximum recommended task size is 1000 KiB.
[Stage 10:>                                                       (0 + 16) / 16]

+-------------------------------------------------------------------------+-----+
|Item                                                                     |count|
+-------------------------------------------------------------------------+-----+
|Butter of cow milk                                                       |1708 |
|Olive oil                                                                |366  |
|Rapeseed or canola oil, crude                                            |427  |
|Bees                                                                     |1708 |
|Turkeys                                                                  |732  |
|Pineapples                                                               |6161 |
|Edible offals of horses and other equines,  fresh, chilled or frozen     |1647 |
|Potatoes                                                                 |8174 |
|Roots and Tubers, Total                                                  |10248|
|Green garlic   



In [None]:
# Meditation HQ
#======================================
# .withColumnRenamed("year", trim(col("year")).alias("year"))
# keep_columns = ['Country', 'Item', 'Element']
# other_columns = [col(column) for column in prod_data.columns if column not in keep_columns]
# other_df = prod_data.select(*other_columns)
# other_df.printSchema()

#===============================
# # Assuming you have a DataFrame named 'df' with the year columns and other columns
# keep_columns = ['Area', 'Item', 'Element']
# year_columns = [col for col in prod_data.columns if col not in keep_columns]

#==========================
# # Melt the year columns
# prod_data = prod_data.select(keep_columns + [
#     explode(
#         [
#             (lit(col).alias('year'), col)
#             for col in year_columns
#         ]
#     ).alias('melted')
# ]).select(keep_columns + [
#     col('melted.year'),
#     col('melted.weight')
# ])
# prod_data.show()

#=========================
# melted_df = renamed_df.selectExpr("stack(61, " + ", ".join([f"'{col}', {col}" for col in renamed_df.columns]) + ") as (year, weight)")

#====================
# from pyspark.sql.functions import expr
# melted_df = other_df.selectExpr("posexplode(array(*)) as (year, weight)").select("year", "weight")
# melted_df.show()

#===================
# cols = other_df.columns
# other_df = other_df.selectExpr("stack({},{})".format(len(cols), ','.join(("'{}'".format(i) for i in cols))))
# other_df.show()

#===================
# # Configure output partitions
# spark.conf.set("spark.sql.shuffle.partitions", "8")

#=================
# #partitionSizes = prod_data.rdd.glom().map(len).collect()
# print("Size of partitions:", partitionSizes)

#==============
# numPartitions = prod_data.rdd.getNumPartitions()
# print("Number of partitions:", numPartitions)

#===================
# repartitionedDF = prod_data.repartition(32)

#==============
# from pyspark import SparkContext
# sc = SparkContext.getOrCreate()
# executor_memory = sc.getConf().get("spark.executor.memory")
# print("Executor Memory:", executor_memory)

#=================
#prod_data.explain("formatted")

#=========================
# Set the maxPartitionBytes property
# spark.conf.set("spark.sql.files.maxPartitionBytes", "256m")

#====================
# # Get unique items in the "Name" column
# unique_names = prod_data.select("Item").distinct()
# # Show the unique names
# unique_names.show()









