In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark

from pyspark import SparkContext
from pyspark.sql import SparkSession

SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory', '45G')

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("Python Spark").getOrCreate()

In [None]:
import pandas as pd

In [None]:
url = "https://raw.githubusercontent.com/jldbc/coffee-quality-database/master/data/arabica_data_cleaned.csv"
raw_arabica = pd.read_csv(url)

In [None]:
raw_arabica

Unnamed: 0.1,Unnamed: 0,Species,Owner,Country.of.Origin,Farm.Name,Lot.Number,Mill,ICO.Number,Company,Altitude,...,Color,Category.Two.Defects,Expiration,Certification.Body,Certification.Address,Certification.Contact,unit_of_measurement,altitude_low_meters,altitude_high_meters,altitude_mean_meters
0,1,Arabica,metad plc,Ethiopia,metad plc,,metad plc,2014/2015,metad agricultural developmet plc,1950-2200,...,Green,0,"April 3rd, 2016",METAD Agricultural Development plc,309fcf77415a3661ae83e027f7e5f05dad786e44,19fef5a731de2db57d16da10287413f5f99bc2dd,m,1950.00,2200.00,2075.00
1,2,Arabica,metad plc,Ethiopia,metad plc,,metad plc,2014/2015,metad agricultural developmet plc,1950-2200,...,Green,1,"April 3rd, 2016",METAD Agricultural Development plc,309fcf77415a3661ae83e027f7e5f05dad786e44,19fef5a731de2db57d16da10287413f5f99bc2dd,m,1950.00,2200.00,2075.00
2,3,Arabica,grounds for health admin,Guatemala,"san marcos barrancas ""san cristobal cuch",,,,,1600 - 1800 m,...,,0,"May 31st, 2011",Specialty Coffee Association,36d0d00a3724338ba7937c52a378d085f2172daa,0878a7d4b9d35ddbf0fe2ce69a2062cceb45a660,m,1600.00,1800.00,1700.00
3,4,Arabica,yidnekachew dabessa,Ethiopia,yidnekachew dabessa coffee plantation,,wolensu,,yidnekachew debessa coffee plantation,1800-2200,...,Green,2,"March 25th, 2016",METAD Agricultural Development plc,309fcf77415a3661ae83e027f7e5f05dad786e44,19fef5a731de2db57d16da10287413f5f99bc2dd,m,1800.00,2200.00,2000.00
4,5,Arabica,metad plc,Ethiopia,metad plc,,metad plc,2014/2015,metad agricultural developmet plc,1950-2200,...,Green,2,"April 3rd, 2016",METAD Agricultural Development plc,309fcf77415a3661ae83e027f7e5f05dad786e44,19fef5a731de2db57d16da10287413f5f99bc2dd,m,1950.00,2200.00,2075.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1306,1307,Arabica,juan carlos garcia lopez,Mexico,el centenario,,"la esperanza, municipio juchique de ferrer, ve...",1104328663,terra mia,900,...,,20,"September 17th, 2013",AMECAFE,59e396ad6e22a1c22b248f958e1da2bd8af85272,0eb4ee5b3f47b20b049548a2fd1e7d4a2b70d0a7,m,900.00,900.00,900.00
1307,1308,Arabica,myriam kaplan-pasternak,Haiti,200 farms,,coeb koperativ ekselsyo basen (350 members),,haiti coffee,~350m,...,Blue-Green,16,"May 24th, 2013",Specialty Coffee Association,36d0d00a3724338ba7937c52a378d085f2172daa,0878a7d4b9d35ddbf0fe2ce69a2062cceb45a660,m,350.00,350.00,350.00
1308,1309,Arabica,"exportadora atlantic, s.a.",Nicaragua,finca las marías,017-053-0211/ 017-053-0212,beneficio atlantic condega,017-053-0211/ 017-053-0212,exportadora atlantic s.a,1100,...,Green,5,"June 6th, 2018",Instituto Hondureño del Café,b4660a57e9f8cc613ae5b8f02bfce8634c763ab4,7f521ca403540f81ec99daec7da19c2788393880,m,1100.00,1100.00,1100.00
1309,1310,Arabica,juan luis alvarado romero,Guatemala,finca el limon,,beneficio serben,11/853/165,unicafe,4650,...,Green,4,"May 24th, 2013",Asociacion Nacional Del Café,b1f20fe3a819fd6b2ee0eb8fdc3da256604f1e53,724f04ad10ed31dbb9d260f0dfd221ba48be8a95,ft,1417.32,1417.32,1417.32


In [None]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

In [None]:
df_arabica = pandas_to_spark(raw_arabica)

In [None]:
df_arabica.show(10)

+----------+-------+--------------------+-----------------+--------------------+----------+-----------------+----------+--------------------+-------------+--------------------+--------------------+--------------+----------+--------------------+------------+--------------------+--------------------+-------+-----------------+-----+------+----------+-------+----+-------+----------+---------+---------+-------------+----------------+--------+--------------------+-------+------------+--------------------+--------------------+--------------------+---------------------+---------------------+-------------------+-------------------+--------------------+--------------------+
|Unnamed: 0|Species|               Owner|Country.of.Origin|           Farm.Name|Lot.Number|             Mill|ICO.Number|             Company|     Altitude|              Region|            Producer|Number.of.Bags|Bag.Weight|  In.Country.Partner|Harvest.Year|        Grading.Date|             Owner.1|Variety|Processing.Method|Ar

In [None]:
tempList = [] #Edit01
for col in df_arabica.columns:
        new_name = col.strip()
        new_name = "".join(new_name.split())
        new_name = new_name.replace('.','_') # EDIT
        tempList.append(new_name) #Edit02
print(tempList) #Just for the sake of it #Edit03

df_arabica = df_arabica.toDF(*tempList) #Edit04

['Unnamed:0', 'Species', 'Owner', 'Country_of_Origin', 'Farm_Name', 'Lot_Number', 'Mill', 'ICO_Number', 'Company', 'Altitude', 'Region', 'Producer', 'Number_of_Bags', 'Bag_Weight', 'In_Country_Partner', 'Harvest_Year', 'Grading_Date', 'Owner_1', 'Variety', 'Processing_Method', 'Aroma', 'Flavor', 'Aftertaste', 'Acidity', 'Body', 'Balance', 'Uniformity', 'Clean_Cup', 'Sweetness', 'Cupper_Points', 'Total_Cup_Points', 'Moisture', 'Category_One_Defects', 'Quakers', 'Color', 'Category_Two_Defects', 'Expiration', 'Certification_Body', 'Certification_Address', 'Certification_Contact', 'unit_of_measurement', 'altitude_low_meters', 'altitude_high_meters', 'altitude_mean_meters']


In [None]:
df_arabica.filter(df_arabica["Country_of_Origin"]=='Brazil')\
          .select('Farm_name')\
          .distinct()\
          .show(100, 100)

+-----------------------------------+
|                          Farm_name|
+-----------------------------------+
|                  fazenda rio verde|
|                    fazenda do lobo|
|                fazenda grota funda|
|                        sitio claro|
|                        santa alina|
|                   fazenda chamusca|
|                        santa maria|
|                        capoeirinha|
|                  fazenda do sertao|
|                         santa fé 2|
|                    café do paraíso|
|            cachoeira da grama farm|
|             são francisco da serra|
|                     fazenda jericó|
|                        sertao farm|
|                   campo das flores|
|                       olhos d'agua|
|       fazenda serra de três barras|
|                    fazenda pantano|
|              pereira estate coffee|
|                          rio verde|
|                  sitío são geraldo|
|                   fazenda baipendi|
|           

In [None]:
df_farms = spark.createDataFrame(
    [
        ("fazenda rio verde",-21.877600079894428, -45.17833587173798 ),  # create your data here, be consistent in the types.
        ("fazenda do lobo", -20.05845833645814, -45.551377369807916),
        ("fazenda grota funda",-4.498676298642221, -46.01438009152326),
        ("sitio claro",-12.404400112670487, -57.0307320652044),
        ("santa alina",-21.76121880758086, -46.674253002625804),
        ("fazenda chamusca",-21.4532483711391, -45.22708818550831),
        ("santa maria",-16.609194745742165, -46.98365752985455),
        ("capoeirinha",-18.64698614414741, -45.796849953979795),
        ("fazenda do sertao",-22.09933939187727, -45.18968007378277),
        ("santa fé 2",-17.582654936016926, -47.2198752950817),
        ("café do paraíso",-22.094568087843687, -45.155496432162685),
        ("cachoeira da grama farm",-21.76626556295618, -46.702161544954144),
        ("são francisco da serra",-22.629913000779446, -44.601043902591805),
        ("fazenda jericó",-18.676315608302055, -45.70281563093488),
        ("sertao farm",-22.099418916815903, -45.18965861611114),
        ("campo das flores",-20.312258124962906, -43.28548394936318),
        ("olhos d'agua",-18.63831446703355, -46.952789277527444),
        ("fazenda serra de três barras",-19.560780988980817, -46.579303497451384),
        ("fazenda pantano",-18.631997524421426, -46.82473127385452),
        ("pereira estate coffee",-22.112480964214626, -45.15508902245408),
        ("rio verde",-21.940510105475326, -45.176192869009476),
        ("sitío são geraldo",-22.59528143237296, -46.66759241371165),
        ("fazenda baipendi",-21.45008093579659, -46.8357381855336),
        ("água limpa",-21.44645605895183, -46.82635044153258),
        ("fazenda kaquend",-21.435236392716707, -46.83244978479159),
        ("fazenda santo antonio",-21.40862617568116, -46.80560442679252),
        ("fazenda vista alegre",-21.44237673402853, -46.818513026788274),
        ("fazenda recreio",-21.780100162105256, -46.67880344388111),
        ("fazenda capoeirnha",-21.761009557830032, -46.67507912298348),
        ("pantano",-21.444638601462618, -46.81747769417689),
        ("fazenda são sebastião",-21.444863288363614, -46.827235570452125),
        ("santa bárbara",-18.52732198399872, -47.569893903127564),
        ("santa mariana",-23.19046310851501, -50.55948683365515),
        ("sertao",-22.095760723175886, -45.189755175672296),
        ("são rafael_ ra/ras certified",-22.784292492565246, -47.032534461666536),
        ("sitío santa luzia",-22.366016295045306, -46.47371725096838),
        ("fazenda são josé mirante",-22.785019797545694, -47.03177457322967),
        ("cianorte",-22.774138613223663, -47.02968245276757),
        ("juliana",-21.46270024286904, -46.832515362350975),
        ("sitío corrego da olaria/são caetano",-22.745464486282955, -47.0338567362583),
        ("fazenda serra negra",-21., -46.674253002625804),
        ("fazendas klem ltda",-20.27591897959779, -41.876604035124465),
        ("castelhana farm",-18.922960557030457, -47.45831617257145),
        ("leticia farm",-19.807319369835636, -42.215697182335234),
        ("helena",-22.107657064931225, -48.32067672583973),
        ("caxambu",-21.339808722609096, -45.42201735838091),
    ],
    ["Farm_name", "long","lat"]  # add your column names here
)

In [None]:
df_arabica = df_arabica.join(df_farms, on=['Farm_name'], how='left_outer')

In [None]:
df_arabica.filter(df_arabica["Country_of_Origin"]=='Brazil')\
          .select(['Farm_name',"long","lat","Total_Cup_Points", "Aroma","Altitude"])\
          .show(100, 100)

+----------------------------+-------------------+-------------------+----------------+-----+--------------+
|                   Farm_name|               long|                lat|Total_Cup_Points|Aroma|      Altitude|
+----------------------------+-------------------+-------------------+----------------+-----+--------------+
|           fazenda rio verde|-21.877600079894428| -45.17833587173798|            84.0| 7.58|          1260|
|           fazenda rio verde|-21.877600079894428| -45.17833587173798|           83.83| 7.92|          1260|
|             fazenda do lobo| -20.05845833645814|-45.551377369807916|           83.17| 7.75|         1000m|
|         fazenda grota funda| -4.498676298642221| -46.01438009152326|           84.92|  8.0|         1200m|
|                 sitio claro|-12.404400112670487|  -57.0307320652044|           84.92|  8.0|        1000 m|
|                 santa alina| -21.76121880758086|-46.674253002625804|           84.33| 8.17|1200m to 1350m|
|            fazend

In [None]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import when
from pyspark.sql.functions import col
import pyspark.sql.functions as func

Création d'un dataframe contenant uniquement les dermes présentes au Brésil

In [None]:
df_brazil = df_arabica.filter(df_arabica["Country_of_Origin"]=='Brazil')

In [None]:
df_brazil.groupBy('Farm_name')\
          .count().alias("count")\
          .sort(col("count").desc())\
          .show()

+--------------------+-----+
|           Farm_name|count|
+--------------------+-----+
|           rio verde|   23|
|                 NaN|   18|
|  fazenda capoeirnha|   13|
|         capoeirinha|   10|
|   fazenda rio verde|    4|
|cachoeira da gram...|    4|
|         sertao farm|    4|
|pereira estate co...|    4|
|    campo das flores|    2|
|     fazenda kaquend|    2|
|     fazenda recreio|    2|
|   fazenda do sertao|    2|
|  fazendas klem ltda|    2|
|         santa maria|    2|
|     café do paraíso|    2|
|     fazenda do lobo|    1|
| fazenda grota funda|    1|
|         sitio claro|    1|
|    fazenda chamusca|    1|
|         santa alina|    1|
+--------------------+-----+
only showing top 20 rows



- Supression des symboles non numériques
- Même système de mesure appliqué à toutes les données (mètres)
- Changement du type des données : de string à int

In [None]:
#https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.regexp_extract.html
df_brazil = df_brazil.withColumn('Altitude_2', regexp_extract(col('Altitude'), '[0-9]+\.?[0-9]*', 0))

#https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/
df_brazil = df_brazil.withColumn("Altitude_3", when(df_brazil.Altitude_2 < 10, df_brazil.Altitude_2 * 1000)
                                          .when(df_brazil.Altitude_2 > 10000, df_brazil.Altitude_2 / 10)
                                          .otherwise(df_brazil.Altitude_2))


df_brazil = df_brazil.withColumn("Altitude_int",df_brazil.Altitude_3.cast('int'))

df_brazil.drop(col("Altitude"))

df_brazil.filter(df_arabica["Country_of_Origin"]=='Brazil')\
       .select(['Farm_name', "Altitude_int"])\
       .show()

+--------------------+------------+
|           Farm_name|Altitude_int|
+--------------------+------------+
|   fazenda rio verde|        1260|
|   fazenda rio verde|        1260|
|     fazenda do lobo|        1000|
| fazenda grota funda|        1200|
|         sitio claro|        1000|
|         santa alina|        1200|
|    fazenda chamusca|         900|
|         santa maria|        null|
|         capoeirinha|         934|
|         capoeirinha|         934|
|         capoeirinha|         905|
|         capoeirinha|         872|
|         capoeirinha|         905|
|         capoeirinha|         905|
|         capoeirinha|         890|
|         capoeirinha|         934|
|   fazenda do sertao|        1250|
|          santa fé 2|         900|
|     café do paraíso|         894|
|cachoeira da gram...|        null|
+--------------------+------------+
only showing top 20 rows



Création de 2 dataframes, l'un contenant l'altitude moyenne de l'ensemble des fermes, l'autre contenant les altitudes moyennes de chaque ferme

In [None]:
mean_alt = df_brazil.groupBy('Farm_name')\
                  .mean('Altitude_int')

mean_alt = mean_alt.withColumn("avg(Altitude_int)",mean_alt["avg(Altitude_int)"].cast('int'))

mean_alt_all = df_brazil.agg({'Altitude_int': 'mean'})

mean_alt_all.show()
mean_alt.show()

+------------------+
| avg(Altitude_int)|
+------------------+
|1053.6857142857143|
+------------------+

+--------------------+-----------------+
|           Farm_name|avg(Altitude_int)|
+--------------------+-----------------+
|   fazenda rio verde|             1260|
|     fazenda do lobo|             1000|
| fazenda grota funda|             1200|
|         sitio claro|             1000|
|         santa alina|             1200|
|    fazenda chamusca|              900|
|         santa maria|             null|
|         capoeirinha|              915|
|   fazenda do sertao|             1250|
|          santa fé 2|              900|
|     café do paraíso|              894|
|cachoeira da gram...|             1250|
|são francisco da ...|              950|
|      fazenda jericó|             1100|
|         sertao farm|             1225|
|    campo das flores|             1000|
|        olhos d'agua|              900|
|fazenda serra de ...|             1250|
|     fazenda pantano|           

Altitude moyenne de l'ensemble des fermes arrondie

In [None]:
mean_alt_all = round(mean_alt_all.head()[0])

In [None]:
df_brazil = df_brazil.replace(float('nan'), None)

Si une ligne n'a ni ferme ni altitude, on lui attribue l'altitude moyenne de l'ensemble des fermes

In [None]:
df_brazil = df_brazil.withColumn("Altitude_int", when((df_brazil["Farm_name"].isNull()) & (df_brazil["Altitude_int"].isNull()), mean_alt_all)\
                                            .otherwise(df_brazil.Altitude_int))

df_brazil = df_brazil.withColumn("Altitude_int",df_brazil.Altitude_int.cast('int'))       

df_brazil.select(['Farm_name', "Altitude_int"])\
       .show()

+--------------------+------------+
|           Farm_name|Altitude_int|
+--------------------+------------+
|   fazenda rio verde|        1260|
|   fazenda rio verde|        1260|
|     fazenda do lobo|        1000|
| fazenda grota funda|        1200|
|         sitio claro|        1000|
|         santa alina|        1200|
|    fazenda chamusca|         900|
|         santa maria|        null|
|         capoeirinha|         934|
|         capoeirinha|         934|
|         capoeirinha|         905|
|         capoeirinha|         872|
|         capoeirinha|         905|
|         capoeirinha|         905|
|         capoeirinha|         890|
|         capoeirinha|         934|
|   fazenda do sertao|        1250|
|          santa fé 2|         900|
|     café do paraíso|         894|
|cachoeira da gram...|        null|
+--------------------+------------+
only showing top 20 rows



Si une ligne a une ferme mais pas d'altitude, on lui attribue l'altitude moyenne de cette ferme

In [None]:
df_brazil = df_brazil.join(mean_alt, on="Farm_name")\
       .withColumn(
        "Altitude_int",
        when(
            (df_brazil["Altitude_int"].isNull()), 
            mean_alt["avg(Altitude_int)"]
        ).otherwise(df_brazil["Altitude_int"])
    )
       
df_brazil.select(['Farm_name', "Altitude_int"])\
       .show()

+--------------------+------------+
|           Farm_name|Altitude_int|
+--------------------+------------+
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|cachoeira da gram...|        1250|
|cachoeira da gram...|        1250|
+--------------------+------------+
only showing top 20 rows



Si après cela, il y a toujours des lignes sans altitude, on leur attribue l'altitude moyenne de l'ensemble des fermes

In [None]:
df_brazil = df_brazil.withColumn("Altitude_int", when(df_brazil["Altitude_int"].isNull(), mean_alt_all)\
                                            .otherwise(df_brazil.Altitude_int))

df_brazil = df_brazil.withColumn("Altitude_int",df_brazil.Altitude_int.cast('int'))       

df_brazil.select(['Farm_name', "Altitude_int"])\
       .show()

+--------------------+------------+
|           Farm_name|Altitude_int|
+--------------------+------------+
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|                 NaN|        1100|
|cachoeira da gram...|        1250|
|cachoeira da gram...|        1250|
+--------------------+------------+
only showing top 20 rows



In [None]:
df_brazil.select(['Farm_name', "Altitude_int"])\
         .filter(df_brazil["Altitude_int"]\
         .isNull()).show()

+---------+------------+
|Farm_name|Altitude_int|
+---------+------------+
+---------+------------+

