In [4]:
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import desc
import pyspark.sql.functions as F

# Extract

In [5]:
from pyspark.sql import SparkSession
def load_data(file_path):
    spark = SparkSession.builder.getOrCreate()
    data = spark.read.csv(file_path, header=True, inferSchema=True)
    return data
df = load_data("soil_pollution_diseases.csv")

#df.show()

In [6]:
df.printSchema()

root
 |-- Case_ID: string (nullable = true)
 |-- Date_Reported: date (nullable = true)
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Pollutant_Type: string (nullable = true)
 |-- Pollutant_Concentration_mg_kg: double (nullable = true)
 |-- Soil_pH: double (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_%: double (nullable = true)
 |-- Rainfall_mm: double (nullable = true)
 |-- Crop_Type: string (nullable = true)
 |-- Farming_Practice: string (nullable = true)
 |-- Nearby_Industry: string (nullable = true)
 |-- Water_Source_Type: string (nullable = true)
 |-- Soil_Texture: string (nullable = true)
 |-- Soil_Organic_Matter_%: double (nullable = true)
 |-- Disease_Type: string (nullable = true)
 |-- Disease_Severity: string (nullable = true)
 |-- Health_Symptoms: string (nullable = true)
 |-- Age_Group_Affected: string (nullable = true)
 |-- Gender_Most_Affected: string (nullable = true)
 |-- Mitigation_Measure: string (null

# Transform

### Drop kolom

Drop kolom 'Region' karena tidak digunakan pada analisa, untuk daerah dapat direpresentasikan oleh kolom 'Country'

In [7]:
df = df.drop("Region")
df.printSchema()

root
 |-- Case_ID: string (nullable = true)
 |-- Date_Reported: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Pollutant_Type: string (nullable = true)
 |-- Pollutant_Concentration_mg_kg: double (nullable = true)
 |-- Soil_pH: double (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_%: double (nullable = true)
 |-- Rainfall_mm: double (nullable = true)
 |-- Crop_Type: string (nullable = true)
 |-- Farming_Practice: string (nullable = true)
 |-- Nearby_Industry: string (nullable = true)
 |-- Water_Source_Type: string (nullable = true)
 |-- Soil_Texture: string (nullable = true)
 |-- Soil_Organic_Matter_%: double (nullable = true)
 |-- Disease_Type: string (nullable = true)
 |-- Disease_Severity: string (nullable = true)
 |-- Health_Symptoms: string (nullable = true)
 |-- Age_Group_Affected: string (nullable = true)
 |-- Gender_Most_Affected: string (nullable = true)
 |-- Mitigation_Measure: string (nullable = true)
 |-- Case_Resolved: strin

### Null Values

Mencari kolom yang memiliki nilai Null

Kolom numerikal:

In [8]:
# Kolom numerik
import pyspark.sql.functions as F
def count_missings(spark_df,sort=True):
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in spark_df.dtypes if c_type not in ('timestamp', 'string','date')]).toPandas()
    
    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [9]:
count_missings(df)

Unnamed: 0,count
Pollutant_Concentration_mg_kg,0
Soil_pH,0
Temperature_C,0
Humidity_%,0
Rainfall_mm,0
Soil_Organic_Matter_%,0


Kolom kategorikal:

In [10]:
from pyspark.sql.types import StringType, IntegerType, DoubleType
schema = df.schema
string_cols = [f.name for f in schema if isinstance(f.dataType, StringType)]
df_string = df.select(*string_cols)
#df_string.show()
df_string.printSchema()

root
 |-- Case_ID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Pollutant_Type: string (nullable = true)
 |-- Crop_Type: string (nullable = true)
 |-- Farming_Practice: string (nullable = true)
 |-- Nearby_Industry: string (nullable = true)
 |-- Water_Source_Type: string (nullable = true)
 |-- Soil_Texture: string (nullable = true)
 |-- Disease_Type: string (nullable = true)
 |-- Disease_Severity: string (nullable = true)
 |-- Health_Symptoms: string (nullable = true)
 |-- Age_Group_Affected: string (nullable = true)
 |-- Gender_Most_Affected: string (nullable = true)
 |-- Mitigation_Measure: string (nullable = true)
 |-- Case_Resolved: string (nullable = true)
 |-- Follow_Up_Required: string (nullable = true)



In [11]:
# Kolom kategorikal
Dict_Null = {col:df_string.filter(df_string[col].isNull()).count() for col in df_string.columns}
Dict_Null

{'Case_ID': 0,
 'Country': 0,
 'Pollutant_Type': 0,
 'Crop_Type': 0,
 'Farming_Practice': 0,
 'Nearby_Industry': 0,
 'Water_Source_Type': 0,
 'Soil_Texture': 0,
 'Disease_Type': 0,
 'Disease_Severity': 0,
 'Health_Symptoms': 0,
 'Age_Group_Affected': 0,
 'Gender_Most_Affected': 0,
 'Mitigation_Measure': 0,
 'Case_Resolved': 0,
 'Follow_Up_Required': 0}

In [12]:
for col in df_string.columns:
    print(f"Unique values in column '{col}':")
    df_string.select(col).distinct().show()

Unique values in column 'Case_ID':
+-----------+
|    Case_ID|
+-----------+
|CASE_100050|
|CASE_100228|
|CASE_100335|
|CASE_100512|
|CASE_100896|
|CASE_101065|
|CASE_101085|
|CASE_101191|
|CASE_101674|
|CASE_101693|
|CASE_101805|
|CASE_102166|
|CASE_102167|
|CASE_102708|
|CASE_102803|
|CASE_102810|
|CASE_100090|
|CASE_100139|
|CASE_100317|
|CASE_100978|
+-----------+
only showing top 20 rows

Unique values in column 'Country':
+---------+
|  Country|
+---------+
|  Germany|
|    India|
|    China|
|  Nigeria|
|      USA|
|   Mexico|
|   Brazil|
|    Kenya|
|Australia|
| Pakistan|
+---------+

Unique values in column 'Pollutant_Type':
+--------------+
|Pollutant_Type|
+--------------+
|       Arsenic|
|    Pesticides|
|          Lead|
|       Mercury|
|       Cadmium|
|      Chromium|
+--------------+

Unique values in column 'Crop_Type':
+---------+
|Crop_Type|
+---------+
|  Soybean|
|   Potato|
|    Wheat|
|   Cotton|
|     Rice|
|     Corn|
+---------+

Unique values in column 'Far

*Didapatkan bahwa terdapat value 'None' pada kolom Nearby_Industry sehingga perlu didrop karena jumlahnya relatif rendah serta tidak reprsentatif jika diubah menggunakan central measurement*

In [13]:
# Drop 'None' dari kolom Nearby_Industry
df = df.filter(df.Nearby_Industry != 'None')

In [14]:
df.select('Nearby_Industry').distinct().show()

+---------------+
|Nearby_Industry|
+---------------+
|       Chemical|
|         Mining|
|        Textile|
|    Agriculture|
+---------------+



In [15]:
df.count()

2418

### Inconsistent Format

Mengubah tipe data Case_Resolved dan Follow_Up_Required

In [16]:
from pyspark.sql.functions import when
df = df.withColumn(
    "Case_Resolved",
    when(df["Case_Resolved"] == "Yes", True)
    .when(df["Case_Resolved"] == "No", False)
    .otherwise(None))

df = df.withColumn(
    "Follow_Up_Required",
    when(df["Follow_Up_Required"] == "Yes", True)
    .when(df["Follow_Up_Required"] == "No", False)
    .otherwise(None))

In [17]:
df.select('Case_Resolved').distinct().show()
df.select('Follow_Up_Required').distinct().show()

+-------------+
|Case_Resolved|
+-------------+
|         true|
|        false|
+-------------+

+------------------+
|Follow_Up_Required|
+------------------+
|              true|
|             false|
+------------------+



### Create Tables

Mengelompokkan kolom berdasarkan dim & factnya

In [18]:
case = df.select("Case_ID",
                 "Date_Reported",
                 "Country",
                 "Mitigation_Measure",
                 "Case_Resolved", 
                 "Follow_Up_Required")

In [20]:
pollutant = df.select("Case_ID",
                      "Pollutant_Type",
                      "Pollutant_Concentration_mg_kg")

In [21]:
soil = df.select("Case_ID",
                 "Soil_pH",
                 "Temperature_C",
                 "Humidity_%",
                 "Rainfall_mm",
                 "Soil_Texture", 
                 "Soil_Organic_Matter_%")

In [22]:
soil = df.select("Case_ID",
                 "Crop_Type",
                 "Farming_Practice",
                 "Water_Source_Type")