# `This notebook contains all spark code for our project`

## Imports

In [28]:
import findspark
findspark.init()
import pandas as pd
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import first
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, year, month, to_date, min, max, avg, first, lag, round, count, when, trim, isnan, lit, substring, col, ceil, concat, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.types import DecimalType, StringType, DoubleType, DecimalType
import happybase


## Data Preparation

### `COMTRADE`

In [11]:

spark = SparkSession.builder.appName("data_prep").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print("Spark session ready:", spark)

parquet_path = "/user/vagrant/project/comtrade/comtrade_data"

def load_joined_df(parquet_path):
    print("\n=== LOAD PARQUET ===")
    print("Path:", parquet_path)

    try:
        df = spark.read.parquet(parquet_path)
        print("Parquet wczytany")
        print("Liczba kolumn:", len(df.columns))
        print("Przykładowe kolumny:", df.columns[:10])
        print("Schema:")
        df.printSchema()
        print("Sample rows:")
        df.show(5, truncate=False)
        return df

    except Exception as e:
        print("Błąd wczytywania Parquet")
        print(type(e).__name__)
        print(e)
        raise

df = load_joined_df(parquet_path)

print("\n=== BASIC CHECK ===")
print("Row count:", df.count())


# gdzie są braki danych
def missing_condition(column_name, data_type):
    c = col(column_name)

    if isinstance(data_type, StringType):
        return c.isNull() | (trim(c) == "")
    
    elif isinstance(data_type, (DoubleType, DecimalType)):
        return c.isNull() | isnan(c)
    
    else:
        return c.isNull()

print("\n=== MISSING VALUES CHECK ===")

total_rows = df.count()
print("Total rows:", total_rows)

missing_report = df.select([
    count(
        when(
            missing_condition(c, df.schema[c].dataType),
            c
        )
    ).alias(c)
    for c in df.columns
])

missing_report.show(truncate=False)



print("\n=== FILL NA ===")

df = df.fillna({
    "quantity": 0,
    "weight": 0
})

print("Filled NA for quantity, weight")

df.select("quantity", "weight").summary().show()

# zamiana parnterISO X1 i _X na UNIDENTIFIED
print("\n=== CLEAN partnerISO ===")

df.select("partnerISO").distinct().show(20, truncate=False)

df = df.withColumn(
    "partnerISO",
    when(trim(col("partnerISO")).isin("X1", "_X", "E19", "F19"), "UNIDENTIFIED")
    .otherwise(trim(col("partnerISO")))
)

print("partnerISO cleaned")
df.select("partnerISO").distinct().show(20, truncate=False)

# tworzenie nowych kolumn: unit_value_usd, usd_per_kg, year, month, quarter, quarter_label
print("\n=== UNIT VALUES ===")

df = df.withColumn(
    "unit_value_usd",
    when(col("quantity") > 0, col("primary_value_usd") / col("quantity"))
    .otherwise(lit(0))
)

df = df.withColumn(
    "usd_per_kg",
    when(col("weight") > 0, col("primary_value_usd") / col("weight"))
    .otherwise(lit(0))
)

df.select(
    "primary_value_usd", "quantity", "weight",
    "unit_value_usd", "usd_per_kg"
).show(5, truncate=False)

print("\n=== DATE FEATURES ===")

df = (
    df.withColumn("year", substring("data_period", 1, 4).cast("int"))
      .withColumn("month", substring("data_period", 5, 2).cast("int"))
      .withColumn("quarter", ceil(col("month") / 3))
)

df = df.withColumn("quarter_label", concat(lit("Q"), col("quarter")))

df.select("data_period", "year", "month", "quarter", "quarter_label") \
  .distinct().show(10, truncate=False)


# wyliczenie month_world_export_value i share_of_month_value
print("\n=== WINDOW CALCULATIONS ===")

w = Window.partitionBy("data_period", "commodity_desc", "hs_code")

df = df.withColumn(
    "month_world_export_value",
    spark_sum(
        when(col("partnerISO") == "W00", col("primary_value_usd"))
        .otherwise(0)
    ).over(w)
)

df = df.withColumn(
    "share_of_month_market",
    when(
        col("month_world_export_value") > 0,
        col("primary_value_usd") / col("month_world_export_value")
    ).otherwise(lit(0))
)

df.select(
    "commodity_desc",
    "partnerISO",
    "primary_value_usd",
    "month_world_export_value",
    "share_of_month_market"
).show(10, truncate=False)

print("\n=== FINAL CHECKPOINT ===")
comtrade_monthly = df
print("Final row count:", comtrade_monthly.count())
print("Final schema:")
comtrade_monthly.printSchema()
print("Pipeline for montly aggregation comtrade finished successfully")


Spark session ready: <pyspark.sql.session.SparkSession object at 0x7faa3818f9a0>

=== LOAD PARQUET ===
Path: /user/vagrant/project/comtrade/comtrade_data
Parquet wczytany
Liczba kolumn: 9
Przykładowe kolumny: ['commodity_desc', 'data_period', 'hs_code', 'primary_value_usd', 'partner_code', 'partnerISO', 'quantity', 'quantity_code', 'weight']
Schema:
root
 |-- commodity_desc: string (nullable = true)
 |-- data_period: string (nullable = true)
 |-- hs_code: string (nullable = true)
 |-- primary_value_usd: double (nullable = true)
 |-- partner_code: long (nullable = true)
 |-- partnerISO: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- quantity_code: long (nullable = true)
 |-- weight: double (nullable = true)

Sample rows:
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+----

                                                                                

+-------+------------------+------------------+
|summary|          quantity|            weight|
+-------+------------------+------------------+
|  count|            143342|            143342|
|   mean|316234.47579301987|497239.90725810936|
| stddev|2927262.1000720784| 3825894.374747015|
|    min|               0.0|               0.0|
|    25%|               0.0|             541.0|
|    50%|             172.0|           10766.0|
|    75%|           22145.0|           95897.0|
|    max|       1.6352964E8|      1.95538731E8|
+-------+------------------+------------------+


=== CLEAN partnerISO ===
+----------+
|partnerISO|
+----------+
|NIU       |
|HTI       |
|PSE       |
|LVA       |
|BRB       |
|ZMB       |
|JAM       |
|BRA       |
|ARM       |
|MOZ       |
|JOR       |
|CUB       |
|FRA       |
|SOM       |
|ABW       |
|COD       |
|BRN       |
|URY       |
|BOL       |
|LBY       |
+----------+
only showing top 20 rows

partnerISO cleaned
+----------+
|partnerISO|
+----------+
|

                                                                                

+------------------------------+------------+-----------------+------------------------+---------------------+
|commodity_desc                |partnerISO  |primary_value_usd|month_world_export_value|share_of_month_market|
+------------------------------+------------+-----------------+------------------------+---------------------+
|Whiskies                      |BHS         |143.0            |1193.0                  |0.11986588432523052  |
|Whiskies                      |UNIDENTIFIED|72.0             |1193.0                  |0.06035205364626991  |
|Whiskies                      |UNIDENTIFIED|978.0            |1193.0                  |0.8197820620284996   |
|Whiskies                      |W00         |1193.0           |1193.0                  |1.0                  |
|Cigarettes; containing tobacco|BLZ         |54.0             |8819047.0               |6.1231105809958835E-6|
|Cigarettes; containing tobacco|IRN         |8.0              |8819047.0               |9.071274934808716E-7 |
|

In [12]:
print("\n=== START YEARLY AGGREGATION ===")

print("Input df row count:", comtrade_monthly.count())
print("Input df columns:", comtrade_monthly.columns)

print("Years available:")
comtrade_monthly.select("year").distinct().orderBy("year").show()

print("\n=== GROUP BY YEAR / PARTNER / COMMODITY ===")

df_yearly = (
    comtrade_monthly.groupBy("year", "partnerISO", "commodity_desc", "hs_code")
      .agg(
          spark_sum("primary_value_usd").alias("annual_value_usd"),
          spark_sum("quantity").alias("quantity"),
          spark_sum("weight").alias("weight")
      )
)

print("GroupBy done")
print("Yearly row count:", df_yearly.count())
print("Yearly schema:")
df_yearly.printSchema()

print("Sample yearly rows:")
df_yearly.show(10, truncate=False)

print("\n=== YEARLY UNIT VALUES ===")

df_yearly = df_yearly.withColumn(
    "unit_value_usd",
    when(col("quantity") > 0, col("annual_value_usd") / col("quantity"))
    .otherwise(lit(0))
)

df_yearly = df_yearly.withColumn(
    "usd_per_kg",
    when(col("weight") > 0, col("annual_value_usd") / col("weight"))
    .otherwise(lit(0))
)

df_yearly.select(
    "partnerISO", "commodity_desc",
    "annual_value_usd", "quantity", "weight",
    "unit_value_usd", "usd_per_kg"
).show(10, truncate=False)


print("\n=== CHECK W00 (WORLD) ===")

df_yearly.filter(col("partnerISO") == "W00") \
    .groupBy("year") \
    .agg(
        spark_sum("annual_value_usd").alias("world_export_check")
    ) \
    .orderBy("year") \
    .show(truncate=False)

print("\n=== WINDOW: WORLD EXPORT VALUE ===")

w = Window.partitionBy("year", "commodity_desc", "hs_code")

df_yearly = df_yearly.withColumn(
    "world_export_value",
    spark_sum(
        when(col("partnerISO") == "W00", col("annual_value_usd"))
        .otherwise(0)
    ).over(w)
)

df_yearly.select(
    "year",
    "commodity_desc",
    "hs_code",
    "partnerISO",
    "annual_value_usd",
    "world_export_value"
).show(10, truncate=False)

print("\n=== SHARE OF YEAR MARKET ===")

df_yearly = df_yearly.withColumn(
    "share_of_year_market",
    when(
        col("world_export_value") > 0,
        col("annual_value_usd") / col("world_export_value")
    ).otherwise(lit(0))
)

df_yearly.select(
    "partnerISO",
    "annual_value_usd",
    "world_export_value",
    "share_of_year_market"
).show(10, truncate=False)

print("\n=== FINAL YEARLY CHECKPOINT ===")

print("Final yearly row count:", df_yearly.count())
print("Final yearly schema:")
df_yearly.printSchema()

print("Created COMTRADE YEARLY successfully")




=== START YEARLY AGGREGATION ===
Input df row count: 143342
Input df columns: ['commodity_desc', 'data_period', 'hs_code', 'primary_value_usd', 'partner_code', 'partnerISO', 'quantity', 'quantity_code', 'weight', 'unit_value_usd', 'usd_per_kg', 'year', 'month', 'quarter', 'quarter_label', 'month_world_export_value', 'share_of_month_market']
Years available:


                                                                                

+----+
|year|
+----+
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
+----+
only showing top 20 rows


=== GROUP BY YEAR / PARTNER / COMMODITY ===
GroupBy done


                                                                                

Yearly row count: 19597
Yearly schema:
root
 |-- year: integer (nullable = true)
 |-- partnerISO: string (nullable = true)
 |-- commodity_desc: string (nullable = true)
 |-- hs_code: string (nullable = true)
 |-- annual_value_usd: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- weight: double (nullable = true)

Sample yearly rows:


                                                                                

+----+------------+-----------------------------------------------------------------------------------------------------------------------------------------------+-------+----------------+---------+---------+
|year|partnerISO  |commodity_desc                                                                                                                                 |hs_code|annual_value_usd|quantity |weight   |
+----+------------+-----------------------------------------------------------------------------------------------------------------------------------------------+-------+----------------+---------+---------+
|2005|SVK         |Dairy produce; cheese (not grated, powdered or processed), n.e.s. in heading no. 0406                                                          |040690 |1.0601537E7     |2982455.0|2982455.0|
|2005|FRA         |Medicaments; consisting of mixed or unmixed products n.e.s. in heading no. 3004, for therapeutic or prophylactic uses, packaged for retail sale|3

[Stage 166:>                                                        (0 + 1) / 1]

+------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+---------+------------------+------------------+
|partnerISO  |commodity_desc                                                                                                                                 |annual_value_usd|quantity |weight   |unit_value_usd    |usd_per_kg        |
+------------+-----------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+---------+------------------+------------------+
|SVK         |Dairy produce; cheese (not grated, powdered or processed), n.e.s. in heading no. 0406                                                          |1.0601537E7     |2982455.0|2982455.0|3.554634353242547 |3.554634353242547 |
|FRA         |Medicaments; consisting of mixed or unmixed produc

                                                                                

+----+-------------------+
|year|world_export_check |
+----+-------------------+
|2004|1.473079576E9      |
|2005|1.975460246E9      |
|2006|2.800242706E9      |
|2007|3.759173739E9      |
|2008|7.070653686E9      |
|2009|6.542160684E9      |
|2010|6.355235911240001E9|
|2011|6.047798903E9      |
|2012|6.229114211E9      |
|2013|7.181364605952E9   |
|2014|8.301071083E9      |
|2015|7.317550526E9      |
|2016|6.772636697E9      |
|2017|8.396940478E9      |
|2018|8.78543596E9       |
|2019|8.812147066E9      |
|2020|9.516407241E9      |
|2021|1.0159426942E10    |
|2022|9.747042721E9      |
|2023|1.1404533375E10    |
+----+-------------------+
only showing top 20 rows


=== WINDOW: WORLD EXPORT VALUE ===


                                                                                

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-------+----------+-------------------+------------------+
|year|commodity_desc                                                                                                                                             |hs_code|partnerISO|annual_value_usd   |world_export_value|
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-------+----------+-------------------+------------------+
|2010|Cosmetic and toilet preparations; n.e.c. in heading no. 3304, for the care of the skin (excluding medicaments, including sunscreen or sun tan preparations)|330499 |ARM       |239459.483         |7.21757602561E8   |
|2010|Cosmetic and toilet preparations; n.e.c. in heading no. 3304, for the care of the skin (excluding medicaments,

                                                                                

+----------+-------------------+------------------+---------------------+
|partnerISO|annual_value_usd   |world_export_value|share_of_year_market |
+----------+-------------------+------------------+---------------------+
|ARM       |239459.483         |7.21757602561E8   |3.317727200244654E-4 |
|CAN       |580743.581         |7.21757602561E8   |8.04624127185301E-4  |
|DEU       |9.290044021000001E7|7.21757602561E8   |0.12871418310020288  |
|PRT       |5921842.393        |7.21757602561E8   |0.008204752360054995 |
|CYP       |63200.763999999996 |7.21757602561E8   |8.756508248163347E-5 |
|LVA       |2811203.19         |7.21757602561E8   |0.003894940877692256 |
|KGZ       |250462.284         |7.21757602561E8   |3.470171746183054E-4 |
|MNG       |3922084.0980000007 |7.21757602561E8   |0.0054340738276719746|
|CHN       |285404.698         |7.21757602561E8   |3.9543012361394384E-4|
|GTM       |108.741            |7.21757602561E8   |1.5066138494995578E-7|
+----------+-------------------+------



Final yearly row count: 19597
Final yearly schema:
root
 |-- year: integer (nullable = true)
 |-- partnerISO: string (nullable = true)
 |-- commodity_desc: string (nullable = true)
 |-- hs_code: string (nullable = true)
 |-- annual_value_usd: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- unit_value_usd: double (nullable = true)
 |-- usd_per_kg: double (nullable = true)
 |-- world_export_value: double (nullable = true)
 |-- share_of_year_market: double (nullable = true)

Created COMTRADE YEARLY successfully


                                                                                

## `NBP`

In [19]:
data = [
        # --- Bliski Wschód / Azja ---
    Row(country_iso3="PSE", currency="ILS"),
    Row(country_iso3="IRQ", currency="IQD"),
    Row(country_iso3="IRN", currency="IRR"),
    Row(country_iso3="ISR", currency="ILS"),
    Row(country_iso3="JOR", currency="JOD"),
    Row(country_iso3="KWT", currency="KWD"),
    Row(country_iso3="LBN", currency="LBP"),
    Row(country_iso3="OMN", currency="OMR"),
    Row(country_iso3="QAT", currency="QAR"),
    Row(country_iso3="SAU", currency="SAR"),
    Row(country_iso3="ARE", currency="AED"),
    Row(country_iso3="YEM", currency="YER"),
    Row(country_iso3="SYR", currency="SYP"),
    Row(country_iso3="AFG", currency="AFN"),
    
    Row(country_iso3="IND", currency="INR"),
    Row(country_iso3="CHN", currency="CNY"),
    Row(country_iso3="JPN", currency="JPY"),
    Row(country_iso3="KOR", currency="KRW"),
    Row(country_iso3="THA", currency="THB"),
    Row(country_iso3="VNM", currency="VND"),
    Row(country_iso3="MYS", currency="MYR"),
    Row(country_iso3="IDN", currency="IDR"),
    Row(country_iso3="PHL", currency="PHP"),
    Row(country_iso3="SGP", currency="SGD"),
    Row(country_iso3="LKA", currency="LKR"),
    Row(country_iso3="PAK", currency="PKR"),
    Row(country_iso3="BGD", currency="BDT"),
    Row(country_iso3="NPL", currency="NPR"),
    Row(country_iso3="MMR", currency="MMK"),
    Row(country_iso3="KHM", currency="KHR"),
    Row(country_iso3="LAO", currency="LAK"),
    Row(country_iso3="PRK", currency="KPW"),
    Row(country_iso3="MNG", currency="MNT"),
    Row(country_iso3="KAZ", currency="KZT"),
    Row(country_iso3="UZB", currency="UZS"),
    Row(country_iso3="TJK", currency="TJS"),
    Row(country_iso3="TKM", currency="TMT"),
    Row(country_iso3="KGZ", currency="KGS"),
    Row(country_iso3="AZE", currency="AZN"),
    Row(country_iso3="ARM", currency="AMD"),
    Row(country_iso3="GEO", currency="GEL"),
    Row(country_iso3="HKG", currency="HKD"),
    Row(country_iso3="MAC", currency="MOP"),
    
    # --- Europa ---
    Row(country_iso3="FIN", currency="EUR"),
    Row(country_iso3="FRA", currency="EUR"),
    Row(country_iso3="ITA", currency="EUR"),
    Row(country_iso3="ESP", currency="EUR"),
    Row(country_iso3="PRT", currency="EUR"),
    Row(country_iso3="DEU", currency="EUR"),
    Row(country_iso3="BEL", currency="EUR"),
    Row(country_iso3="NLD", currency="EUR"),
    Row(country_iso3="LUX", currency="EUR"),
    Row(country_iso3="AUT", currency="EUR"),
    Row(country_iso3="IRL", currency="EUR"),
    Row(country_iso3="EST", currency="EUR"),
    Row(country_iso3="LVA", currency="EUR"),
    Row(country_iso3="LTU", currency="EUR"),
    Row(country_iso3="SVK", currency="EUR"),
    Row(country_iso3="SVN", currency="EUR"),
    Row(country_iso3="MLT", currency="EUR"),
    Row(country_iso3="CYP", currency="EUR"),
    Row(country_iso3="GRC", currency="EUR"),
    Row(country_iso3="HRV", currency="EUR"),
    Row(country_iso3="AND", currency="EUR"),
    Row(country_iso3="MNE", currency="EUR"),
    Row(country_iso3="SMR", currency="EUR"),
    Row(country_iso3="VAT", currency="EUR"),
    
    Row(country_iso3="GBR", currency="GBP"),
    Row(country_iso3="NOR", currency="NOK"),
    Row(country_iso3="SWE", currency="SEK"),
    Row(country_iso3="DNK", currency="DKK"),
    Row(country_iso3="ISL", currency="ISK"),
    Row(country_iso3="CHE", currency="CHF"),
    Row(country_iso3="CZE", currency="CZK"),
    Row(country_iso3="HUN", currency="HUF"),
    Row(country_iso3="ROU", currency="RON"),
    Row(country_iso3="BGR", currency="BGN"),
    Row(country_iso3="SRB", currency="RSD"),
    Row(country_iso3="BIH", currency="BAM"),
    Row(country_iso3="MKD", currency="MKD"),
    Row(country_iso3="ALB", currency="ALL"),
    Row(country_iso3="BLR", currency="BYN"),
    Row(country_iso3="MDA", currency="MDL"),
    Row(country_iso3="UKR", currency="UAH"),
    Row(country_iso3="RUS", currency="RUB"),
    
    # --- Afryka ---
    Row(country_iso3="EGY", currency="EGP"),
    Row(country_iso3="MAR", currency="MAD"),
    Row(country_iso3="DZA", currency="DZD"),
    Row(country_iso3="TUN", currency="TND"),
    Row(country_iso3="ZAF", currency="ZAR"),
    Row(country_iso3="NGA", currency="NGN"),
    Row(country_iso3="GHA", currency="GHS"),
    Row(country_iso3="KEN", currency="KES"),
    Row(country_iso3="ETH", currency="ETB"),
    Row(country_iso3="UGA", currency="UGX"),
    Row(country_iso3="TZA", currency="TZS"),
    Row(country_iso3="RWA", currency="RWF"),
    Row(country_iso3="BDI", currency="BIF"),
    Row(country_iso3="SDN", currency="SDG"),
    Row(country_iso3="SSD", currency="SSP"),
    Row(country_iso3="SEN", currency="XOF"),
    Row(country_iso3="MLI", currency="XOF"),
    Row(country_iso3="NER", currency="XOF"),
    Row(country_iso3="BFA", currency="XOF"),
    Row(country_iso3="CIV", currency="XOF"),
    Row(country_iso3="GIN", currency="GNF"),
    Row(country_iso3="SLE", currency="SLL"),
    Row(country_iso3="GMB", currency="GMD"),
    Row(country_iso3="LBR", currency="LRD"),
    Row(country_iso3="COD", currency="CDF"),
    Row(country_iso3="COG", currency="XAF"),
    Row(country_iso3="CAF", currency="XAF"),
    Row(country_iso3="CMR", currency="XAF"),
    Row(country_iso3="GAB", currency="XAF"),
    Row(country_iso3="GNQ", currency="XAF"),
    Row(country_iso3="STP", currency="STN"),
    Row(country_iso3="CPV", currency="CVE"),
    Row(country_iso3="MUS", currency="MUR"),
    Row(country_iso3="SYC", currency="SCR"),
    Row(country_iso3="NAM", currency="NAD"),
    Row(country_iso3="BWA", currency="BWP"),
    Row(country_iso3="ZMB", currency="ZMW"),
    Row(country_iso3="ZWE", currency="ZWL"),
    Row(country_iso3="MWI", currency="MWK"),
    Row(country_iso3="MOZ", currency="MZN"),
    Row(country_iso3="DJI", currency="DJF"),
    Row(country_iso3="ERI", currency="ERN"),
    Row(country_iso3="SOM", currency="SOS"),
    
    # --- Ameryki ---
    Row(country_iso3="USA", currency="USD"),
    Row(country_iso3="CAN", currency="CAD"),
    Row(country_iso3="MEX", currency="MXN"),
    Row(country_iso3="ARG", currency="ARS"),
    Row(country_iso3="BRA", currency="BRL"),
    Row(country_iso3="CHL", currency="CLP"),
    Row(country_iso3="COL", currency="COP"),
    Row(country_iso3="PER", currency="PEN"),
    Row(country_iso3="URY", currency="UYU"),
    Row(country_iso3="PRY", currency="PYG"),
    Row(country_iso3="BOL", currency="BOB"),
    Row(country_iso3="ECU", currency="USD"),
    Row(country_iso3="VEN", currency="VES"),
    Row(country_iso3="CRI", currency="CRC"),
    Row(country_iso3="PAN", currency="PAB"),
    Row(country_iso3="DOM", currency="DOP"),
    Row(country_iso3="HTI", currency="HTG"),
    Row(country_iso3="JAM", currency="JMD"),
    Row(country_iso3="CUB", currency="CUP"),
    Row(country_iso3="BHS", currency="BSD"),
    Row(country_iso3="TTO", currency="TTD"),
    Row(country_iso3="BRB", currency="BBD"),
    Row(country_iso3="GRD", currency="XCD"),
    Row(country_iso3="DMA", currency="XCD"),
    Row(country_iso3="LCA", currency="XCD"),
    Row(country_iso3="VCT", currency="XCD"),
    Row(country_iso3="KNA", currency="XCD"),
    
    # --- Oceania ---
    Row(country_iso3="AUS", currency="AUD"),
    Row(country_iso3="NZL", currency="NZD"),
    Row(country_iso3="FJI", currency="FJD"),
    Row(country_iso3="PNG", currency="PGK"),
    Row(country_iso3="VUT", currency="VUV"),
    Row(country_iso3="WSM", currency="WST"),
    Row(country_iso3="TON", currency="TOP"),
    
    # --- Terytoria / techniczne / nieidentyfikowalne ---
    Row(country_iso3="GIB", currency="GIP"),
    Row(country_iso3="GRL", currency="DKK"),
    Row(country_iso3="CUW", currency="ANG"),
    Row(country_iso3="ABW", currency="AWG"),
    Row(country_iso3="CYM", currency="KYD"),
    Row(country_iso3="BES", currency="USD"),
    Row(country_iso3="VGB", currency="USD"),
    
    Row(country_iso3="UNIDENTIFIED", currency=None),
    Row(country_iso3="S19", currency=None),
    Row(country_iso3="W00", currency=None),
    Row(country_iso3="XX", currency=None),
    Row(country_iso3="SCG", currency=None),
    Row(country_iso3="ANT", currency=None),
    Row(country_iso3="ATF", currency=None),
    Row(country_iso3="ATA", currency=None),
    Row(country_iso3="BVT", currency=None),
    Row(country_iso3="UMI", currency=None),
    Row(country_iso3="IOT", currency=None),
    Row(country_iso3="WLF", currency=None),
    Row(country_iso3="ESH", currency=None)
 
]
df_dim_currency = spark.createDataFrame(data)

df_dim_currency \
    .filter(col("country_iso3").isin("GBR", "USA", "FRA", "NOR", "SWE", "HUN", "CHE")) \
    .show()
print('Stworzono dim currency')

+------------+--------+
|country_iso3|currency|
+------------+--------+
|         FRA|     EUR|
|         GBR|     GBP|
|         NOR|     NOK|
|         SWE|     SEK|
|         CHE|     CHF|
|         HUN|     HUF|
|         USA|     USD|
+------------+--------+

Stworzono dim currency


In [20]:
print("\n=== LOAD CURRENCY PARQUET ===")

parquet_path = "/user/vagrant/project/NBP/currency_all.parquet"
print("Parquet path:", parquet_path)

currency = load_joined_df(parquet_path)

print("\n=== BASIC CHECK ===")
print("Row count:", currency.count())
print("Columns:", currency.columns)

print("Schema:")
currency.printSchema()

print("Sample rows:")
currency.show(5, truncate=False)


print("\n=== DATE TRANSFORMATION ===")

currency = (
    currency
    .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
    .withColumn("year", year(col("date")))
    .withColumn("month", month(col("date")))
)

print("Date conversion done")

print("Date sanity check:")
currency.select("date", "year", "month") \
    .orderBy("date") \
    .show(10, truncate=False)

print("Null dates check:")
currency.filter(col("date").isNull()).show(5, truncate=False)


print("\n=== MONTHLY AGGREGATION ===")

currency_data = (
    currency
    .groupBy("currency", "year", "month")
    .agg(
        min("rate").alias("min_rate"),
        max("rate").alias("max_rate"),
        avg("rate").alias("avg_rate")
    )
    .orderBy("currency", "year", "month")
)

print("Monthly aggregation done")

print("Row count after aggregation:", currency_data.count())

print("Schema:")
currency_data.printSchema()

print("Sample aggregated rows:")
currency_data.show(20, truncate=False)

print("\n=== FINAL CHECKPOINT (CURRENCY) ===")
print("Currencies available:")
currency_data.select("currency").distinct().orderBy("currency").show(truncate=False)

print("Years range:")
currency_data.select("year").distinct().orderBy("year").show()

print("Currency monthly aggregation finished")



=== LOAD CURRENCY PARQUET ===
Parquet path: /user/vagrant/project/NBP/currency_all.parquet

=== LOAD PARQUET ===
Path: /user/vagrant/project/NBP/currency_all.parquet
Parquet wczytany
Liczba kolumn: 3
Przykładowe kolumny: ['currency', 'date', 'rate']
Schema:
root
 |-- currency: string (nullable = true)
 |-- date: string (nullable = true)
 |-- rate: double (nullable = true)

Sample rows:
+--------+----------+------+
|currency|date      |rate  |
+--------+----------+------+
|USD     |2021-01-04|3.6998|
|USD     |2021-01-05|3.7031|
|USD     |2021-01-07|3.6656|
|USD     |2021-01-08|3.6919|
|USD     |2021-01-11|3.7271|
+--------+----------+------+
only showing top 5 rows


=== BASIC CHECK ===
Row count: 166407
Columns: ['currency', 'date', 'rate']
Schema:
root
 |-- currency: string (nullable = true)
 |-- date: string (nullable = true)
 |-- rate: double (nullable = true)

Sample rows:
+--------+----------+------+
|currency|date      |rate  |
+--------+----------+------+
|USD     |2021-01-04|

                                                                                

+----------+----+-----+
|date      |year|month|
+----------+----+-----+
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
|2002-01-02|2002|1    |
+----------+----+-----+
only showing top 10 rows

Null dates check:


                                                                                

+--------+----+----+----+-----+
|currency|date|rate|year|month|
+--------+----+----+----+-----+
+--------+----+----+----+-----+


=== MONTHLY AGGREGATION ===
Monthly aggregation done


                                                                                

Row count after aggregation: 7910
Schema:
root
 |-- currency: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- min_rate: double (nullable = true)
 |-- max_rate: double (nullable = true)
 |-- avg_rate: double (nullable = true)

Sample aggregated rows:


                                                                                

+--------+----+-----+--------+--------+------------------+
|currency|year|month|min_rate|max_rate|avg_rate          |
+--------+----+-----+--------+--------+------------------+
|AUD     |2002|1    |2.0227  |2.1608  |2.1002409090909095|
|AUD     |2002|2    |2.1124  |2.1708  |2.14745           |
|AUD     |2002|3    |2.1496  |2.2054  |2.172628571428571 |
|AUD     |2002|4    |2.1425  |2.1938  |2.1731666666666674|
|AUD     |2002|5    |2.1361  |2.29    |2.2294050000000003|
|AUD     |2002|6    |2.2494  |2.3277  |2.291505          |
|AUD     |2002|7    |2.2061  |2.3882  |2.280417391304348 |
|AUD     |2002|8    |2.1959  |2.3079  |2.2618714285714288|
|AUD     |2002|9    |2.2405  |2.3013  |2.2701666666666673|
|AUD     |2002|10   |2.2345  |2.2947  |2.267004347826087 |
|AUD     |2002|11   |2.1852  |2.2451  |2.2180789473684213|
|AUD     |2002|12   |2.1605  |2.246   |2.2059100000000003|
|AUD     |2003|1    |2.1603  |2.3328  |2.234272727272727 |
|AUD     |2003|2    |2.236   |2.37    |2.300614999999999

                                                                                

+--------+
|currency|
+--------+
|AUD     |
|BRL     |
|CAD     |
|CHF     |
|CLP     |
|CNY     |
|CZK     |
|DKK     |
|EUR     |
|GBP     |
|HKD     |
|HUF     |
|IDR     |
|ILS     |
|INR     |
|ISK     |
|JPY     |
|KRW     |
|MXN     |
|MYR     |
+--------+
only showing top 20 rows

Years range:




+----+
|year|
+----+
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
+----+
only showing top 20 rows

Currency monthly aggregation finished


                                                                                

In [23]:
print("\n=== START CURRENCY YEARLY AGGREGATION ===")

print("Input row count:", currency.count())
print("Currencies available:")
currency.select("currency").distinct().orderBy("currency").show(truncate=False)

print("Years available:")
currency.select("year").distinct().orderBy("year").show(30)

print("\n=== GROUP BY CURRENCY / YEAR ===")

currency_yearly = (
    currency
    .groupBy("currency", "year")
    .agg(
        min("rate").alias("min_rate"),
        max("rate").alias("max_rate"),
        avg("rate").alias("avg_rate")
    )
    .orderBy("currency", "year")
)

print("Yearly aggregation done")
print("Row count after aggregation:", currency_yearly.count())

print("Schema:")
currency_yearly.printSchema()

print("Sample yearly rows:")
currency_yearly.show(20, truncate=False)

print("\n=== SANITY CHECK: MIN / AVG / MAX ===")

currency_yearly.filter(
    (col("avg_rate") < col("min_rate")) |
    (col("avg_rate") > col("max_rate"))
).show(truncate=False)

print("If empty → sanity check passed")

print("\n=== FINAL CHECKPOINT (CURRENCY YEARLY) ===")

print("Final yearly row count:", currency_yearly.count())

print("Years range:")
currency_yearly.select("year").distinct().orderBy("year").show()
currency_yearly.printSchema()

print("Currency yearly aggregation finished")





=== START CURRENCY YEARLY AGGREGATION ===
Input row count: 166407
Currencies available:


                                                                                

+--------+
|currency|
+--------+
|AUD     |
|BRL     |
|CAD     |
|CHF     |
|CLP     |
|CNY     |
|CZK     |
|DKK     |
|EUR     |
|GBP     |
|HKD     |
|HUF     |
|IDR     |
|ILS     |
|INR     |
|ISK     |
|JPY     |
|KRW     |
|MXN     |
|MYR     |
+--------+
only showing top 20 rows

Years available:


                                                                                

+----+
|year|
+----+
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
|2024|
|2025|
+----+


=== GROUP BY CURRENCY / YEAR ===
Yearly aggregation done


                                                                                

Row count after aggregation: 661
Schema:
root
 |-- currency: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- min_rate: double (nullable = true)
 |-- max_rate: double (nullable = true)
 |-- avg_rate: double (nullable = true)

Sample yearly rows:


                                                                                

+--------+----+--------+--------+------------------+
|currency|year|min_rate|max_rate|avg_rate          |
+--------+----+--------+--------+------------------+
|AUD     |2002|2.0227  |2.3882  |2.2185633466135437|
|AUD     |2003|2.1603  |2.8604  |2.535251778656127 |
|AUD     |2004|2.3167  |3.0464  |2.6850578124999993|
|AUD     |2005|2.3158  |2.6038  |2.4654158730158717|
|AUD     |2006|2.2524  |2.4162  |2.3366333333333333|
|AUD     |2007|2.1289  |2.4544  |2.315681349206348 |
|AUD     |2008|1.7347  |2.2526  |2.0219452755905523|
|AUD     |2009|2.0758  |2.6299  |2.4465521568627455|
|AUD     |2010|2.5175  |3.0813  |2.7715913725490178|
|AUD     |2011|2.8209  |3.5088  |3.054929365079366 |
|AUD     |2012|3.1977  |3.6206  |3.3727972222222222|
|AUD     |2013|2.6771  |3.4226  |3.0599553784860563|
|AUD     |2014|2.6701  |3.0266  |2.843943650793651 |
|AUD     |2015|2.6214  |3.0638  |2.834748818897638 |
|AUD     |2016|2.7609  |3.1411  |2.9335158730158737|
|AUD     |2017|2.6818  |3.1424  |2.89489561752

                                                                                

+--------+----+--------+--------+--------+
|currency|year|min_rate|max_rate|avg_rate|
+--------+----+--------+--------+--------+
+--------+----+--------+--------+--------+

If empty → sanity check passed

=== FINAL CHECKPOINT (CURRENCY YEARLY) ===


                                                                                

Final yearly row count: 661
Years range:




+----+
|year|
+----+
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
+----+
only showing top 20 rows

root
 |-- currency: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- min_rate: double (nullable = true)
 |-- max_rate: double (nullable = true)
 |-- avg_rate: double (nullable = true)

Currency yearly aggregation finished


                                                                                

## `WDI`

In [29]:
print("\n=== LOAD WDI PARQUET ===")

input_path = "hdfs://localhost:8020/user/vagrant/project/WDI/*.parquet"
print("Input path:", input_path)

wdi = (
    spark.read.parquet(input_path)
    .withColumn("value", col("value").cast("double"))
    .withColumn("year", col("year").cast("integer"))
)

print("WDI loaded")
print("Row count:", wdi.count())
print("Columns:", wdi.columns)

print("Schema:")
wdi.printSchema()

print("Sample rows:")
wdi.show(5, truncate=False)
print("\n=== YEAR RANGE (INPUT WDI) ===")

wdi.select(
    min("year").alias("min_year"),
    max("year").alias("max_year")
).show()

print("Distinct years:")
wdi.select("year").distinct().orderBy("year").show(25)

print("\n=== PIVOT WDI ===")

pivoted_wdi = (
    wdi.groupBy("country_id", "country_name", "year")
       .pivot("indicator_name")
       .agg(first("value"))
)

print("Pivot done")
print("Row count after pivot:", pivoted_wdi.count())

print("Schema after pivot:")
pivoted_wdi.printSchema()

print("Sample pivoted rows:")
pivoted_wdi.show(5, truncate=False)

print("\n=== RENAME INDICATORS ===")

rename_map = {
    "External debt stocks, total (DOD, current US$)": "external_debt",
    "GDP (current US$)": "gdp",
    "Imports of goods and services (current US$)": "import",
    "Industry (including construction), value added (% of GDP)": "industry_in_gdp",
    "Inflation, consumer prices (annual %)": "inflation",
    "Population, total": "population",
    "Services, value added (% of GDP)": "services_in_gdp",
    "Trade (% of GDP)": "trade_in_gdp"
}

for old_name, new_name in rename_map.items():
    if old_name in pivoted_wdi.columns:
        print(f"Renaming: {old_name} → {new_name}")
        pivoted_wdi = pivoted_wdi.withColumnRenamed(old_name, new_name)
    else:
        print(f"Column not found (skipped): {old_name}")


print("Columns after rename:")
print(pivoted_wdi.columns)

print("\n=== CALCULATED INDICATORS ===")

final_wdi = (
    pivoted_wdi
    .withColumn(
        "gdp_per_capita",
        round(col("gdp") / col("population"), 2)
    )
    .drop("indicator_name")
)

print("Calculated gdp_per_capita")

final_wdi.select(
    "country_name", "year", "gdp", "population", "gdp_per_capita"
).show(10, truncate=False)


print("\n=== YEAR RANGE (FINAL WDI) ===")

final_wdi.select(
    spark_min("year").alias("min_year"),
    spark_max("year").alias("max_year")
).show()

print("Years available:")
final_wdi.select("year").distinct().orderBy("year").show(25)


print("\n=== POLAND CHECK ===")

poland_df = final_wdi.filter(col("country_name") == "Poland")

print("Poland row count:", poland_df.count())

poland_df.select(
    spark_min("year").alias("min_year"),
    spark_max("year").alias("max_year")
).show()

poland_df.orderBy("year").show(15, truncate=False)

print("\n=== FINAL WDI CHECKPOINT ===")

final_wdi = final_wdi.orderBy("country_name", "year")

print("Final schema:")
final_wdi.printSchema()

print("Final DataFrame count:", final_wdi.count())

print("WDI pipeline finished successfully")





=== LOAD WDI PARQUET ===
Input path: hdfs://localhost:8020/user/vagrant/project/WDI/*.parquet
WDI loaded
Row count: 158908
Columns: ['country_id', 'country_name', 'year', 'indicator_id', 'indicator_name', 'value']
Schema:
root
 |-- country_id: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- indicator_id: string (nullable = true)
 |-- indicator_name: string (nullable = true)
 |-- value: double (nullable = true)

Sample rows:
+----------+------------+----+--------------+-------------------------------------+----------------+
|country_id|country_name|year|indicator_id  |indicator_name                       |value           |
+----------+------------+----+--------------+-------------------------------------+----------------+
|country_id|country_name|null|indicator_id  |indicator_name                       |null            |
|NL        |Netherlands |2023|FP.CPI.TOTL.ZG|Inflation, consumer prices (annual %)|3.83839354342808|
|co

                                                                                

+--------+--------+
|min_year|max_year|
+--------+--------+
|    2003|    2023|
+--------+--------+

Distinct years:


                                                                                

+----+
|year|
+----+
|null|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
+----+


=== PIVOT WDI ===


                                                                                

Pivot done


                                                                                

Row count after pivot: 5587
Schema after pivot:
root
 |-- country_id: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- External debt stocks, total (DOD, current US$): double (nullable = true)
 |-- GDP (current US$): double (nullable = true)
 |-- Imports of goods and services (current US$): double (nullable = true)
 |-- Industry (including construction), value added (% of GDP): double (nullable = true)
 |-- Inflation, consumer prices (annual %): double (nullable = true)
 |-- Population, total: double (nullable = true)
 |-- Services, value added (% of GDP): double (nullable = true)
 |-- Trade (% of GDP): double (nullable = true)
 |-- indicator_name: double (nullable = true)

Sample pivoted rows:


                                                                                

+----------+------------+----+----------------------------------------------+-------------------+-------------------------------------------+---------------------------------------------------------+-------------------------------------+-----------------+--------------------------------+----------------+--------------+
|country_id|country_name|year|External debt stocks, total (DOD, current US$)|GDP (current US$)  |Imports of goods and services (current US$)|Industry (including construction), value added (% of GDP)|Inflation, consumer prices (annual %)|Population, total|Services, value added (% of GDP)|Trade (% of GDP)|indicator_name|
+----------+------------+----+----------------------------------------------+-------------------+-------------------------------------------+---------------------------------------------------------+-------------------------------------+-----------------+--------------------------------+----------------+--------------+
|SZ        |Eswatini    |2003|4.53448

                                                                                

+-------------------------+----+-------------------+-------------+--------------+
|country_name             |year|gdp                |population   |gdp_per_capita|
+-------------------------+----+-------------------+-------------+--------------+
|Eswatini                 |2003|2.14963243327703E9 |1066700.0    |2015.22       |
|Timor-Leste              |2018|1.55598861443594E9 |1275959.0    |1219.47       |
|Kiribati                 |2012|2.07001545867109E8 |112284.0     |1843.55       |
|Portugal                 |2019|2.40115970063019E11|1.0286263E7  |23343.36      |
|Spain                    |2007|1.47674627685451E12|4.5226803E7  |32652.02      |
|Croatia                  |2008|6.84728546172831E10|4309705.0    |15888.06      |
|Middle income            |2022|3.65287677581778E13|5.846464711E9|6248.01       |
|Sint Maarten (Dutch part)|2014|1.36181150837989E9 |36672.0      |37134.91      |
|Marshall Islands         |2007|1.505E8            |52035.0      |2892.28       |
|Venezuela, RB  

NameError: name 'spark_max' is not defined

In [24]:
input_path = "hdfs://localhost:8020/user/vagrant/project/WDI/*.parquet"

# Load data and map types
wdi = spark.read.parquet(input_path) \
    .withColumn("value", col("value").cast("double")) \
    .withColumn("year", col("year").cast("integer"))

# Pivot table
pivoted_wdi = wdi.groupBy("country_id", "country_name", "year") \
    .pivot("indicator_name") \
    .agg(first("value"))

# Renaming columns
rename_map = {
    "External debt stocks, total (DOD, current US$)": "external_debt",
    "GDP (current US$)": "gdp",
    "Imports of goods and services (current US$)": "import",
    "Industry (including construction), value added (% of GDP)": "industry_in_gdp",
    "Inflation, consumer prices (annual %)": "inflation",
    "Population, total": "population",
    "Services, value added (% of GDP)": "services_in_gdp",
    "Trade (% of GDP)": "trade_in_gdp"
}

for old_name, new_name in rename_map.items():
    pivoted_wdi = pivoted_wdi.withColumnRenamed(old_name, new_name)

# Calculate new indicators
# window_spec = Window.partitionBy("country_id").orderBy("year")

final_wdi = pivoted_wdi \
    .withColumn("gdp_per_capita", round(col("gdp") / col("population"), 2)) \
        .drop("indicator_name")
    # .withColumn("prev_year_gdp", lag("gdp").over(window_spec)) \
    # .withColumn("gdp_growth", round(((col("gdp") - col("prev_year_gdp")) / col("prev_year_gdp")) * 100, 2)) \
        # .drop("prev_year_gdp") \



final_wdi = final_wdi.orderBy("country_name", "year")
final_wdi.printSchema()
print("DataFrame count:", final_wdi.count())
poland_df = final_wdi.filter(col("country_name") == "Poland")
poland_df.show(15)

                                                                                

root
 |-- country_id: string (nullable = true)
 |-- country_name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- external_debt: double (nullable = true)
 |-- gdp: double (nullable = true)
 |-- import: double (nullable = true)
 |-- industry_in_gdp: double (nullable = true)
 |-- inflation: double (nullable = true)
 |-- population: double (nullable = true)
 |-- services_in_gdp: double (nullable = true)
 |-- trade_in_gdp: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)



                                                                                

DataFrame count: 5587




+----------+------------+----+-------------+-------------------+-------------------+----------------+------------------+-----------+----------------+----------------+--------------+
|country_id|country_name|year|external_debt|                gdp|             import| industry_in_gdp|         inflation| population| services_in_gdp|    trade_in_gdp|gdp_per_capita|
+----------+------------+----+-------------+-------------------+-------------------+----------------+------------------+-----------+----------------+----------------+--------------+
|        PL|      Poland|2003|         null| 2.1856122599847E11|7.86269228544063E10|27.5111441045363| 0.682701375787681| 3.820457E7|57.9482847667238|69.2134479841788|       5720.81|
|        PL|      Poland|2004|         null|2.56268656145134E11|9.46128680718343E10| 29.185278751871|  3.38264681884691|3.8182222E7|55.9833742114646|70.9815765694162|       6711.73|
|        PL|      Poland|2005|         null|3.06999913150525E11|1.09863040541397E11| 29.05

                                                                                

## Joining data into monthly and yearly tabels

### `MONTHLY TABLE`

In [33]:
print("\n=== JOIN COMTRADE WITH CURRENCY INFO ===")

# join z df_dim_currency (country ISO)
comtrade_with_currency = (
    comtrade_monthly
    .join(
        df_dim_currency,
        comtrade_monthly.partnerISO == df_dim_currency.country_iso3,
        how="left"
    )
    .drop(df_dim_currency.country_iso3)
)

print("After join with country info:")
print("Row count:", comtrade_with_currency.count())
print("Columns:", comtrade_with_currency.columns)
comtrade_with_currency.select("partnerISO", "currency").distinct().show(20, truncate=False)

# join z kursami walut (miesięczne)
print("\n=== JOIN WITH MONTHLY CURRENCY RATES ===")

comtrade_with_rates = (
    comtrade_with_currency
    .join(
        currency_data,
        on=["currency", "year", "month"],
        how="left"
    )
)

print("After join with currency rates:")
print("Row count:", comtrade_with_rates.count())
comtrade_with_rates.select("currency", "year", "month", "commodity_desc", "partnerISO", "max_rate").show(10, truncate=False)



# filtr lat 2004–2023
comtrade_with_rates = comtrade_with_rates.filter((col("year") >= 2004) & (col("year") <= 2023))

print("\nAfter calculating value_local_currency and filtering years 2004-2023:")
print("Row count:", comtrade_with_rates.count())
print("Years range:")
comtrade_with_rates.selectExpr("min(year) as min_year", "max(year) as max_year").show()

print("Sample rows:")
comtrade_with_rates.orderBy("year", "month").show(10, truncate=False)

print("\nSchema after all joins:")
comtrade_with_rates.printSchema()

montly_agg = comtrade_with_rates
print("\nCreated monthly Comtrade with currency rates (2004-2023)")



=== JOIN COMTRADE WITH CURRENCY INFO ===
After join with country info:


                                                                                

Row count: 143342
Columns: ['commodity_desc', 'data_period', 'hs_code', 'primary_value_usd', 'partner_code', 'partnerISO', 'quantity', 'quantity_code', 'weight', 'unit_value_usd', 'usd_per_kg', 'year', 'month', 'quarter', 'quarter_label', 'month_world_export_value', 'share_of_month_market', 'currency']
+----------+--------+
|partnerISO|currency|
+----------+--------+
|NIU       |null    |
|HTI       |HTG     |
|PSE       |ILS     |
|BRB       |BBD     |
|LVA       |EUR     |
|JAM       |JMD     |
|ZMB       |ZMW     |
|BRA       |BRL     |
|ARM       |AMD     |
|MOZ       |MZN     |
|CUB       |CUP     |
|JOR       |JOD     |
|ABW       |AWG     |
|FRA       |EUR     |
|SOM       |SOS     |
|BRN       |null    |
|COD       |CDF     |
|BOL       |BOB     |
|URY       |UYU     |
|GIB       |GIP     |
+----------+--------+
only showing top 20 rows


=== JOIN WITH MONTHLY CURRENCY RATES ===
After join with currency rates:


                                                                                

Row count: 143342


                                                                                

+--------+----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------+
|currency|year|month|commodity_desc                                                                                                                                             |partnerISO|max_rate|
+--------+----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------+
|null    |2021|1    |Vehicles; with only spark-ignition internal combustion reciprocating piston engine, cylinder capacity over 1500 but not over 3000cc                        |NIU       |null    |
|HTG     |2010|4    |Medicaments; consisting of mixed or unmixed products n.e.c. in heading no. 3004, for therapeutic or prophylactic uses, packaged for retail sale            |HTI       |null    |
|HTG     |

                                                                                

Row count: 129298
Years range:


                                                                                

+--------+--------+
|min_year|max_year|
+--------+--------+
|    2004|    2023|
+--------+--------+

Sample rows:




+--------+----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+-----------------+------------+----------+---------+-------------+---------+-------------------+-------------------+-------+-------------+------------------------+---------------------+--------+--------+-----------------+
|currency|year|month|commodity_desc                                                                                                                                                                                                               |data_period|hs_code|primary_value_usd|partner_code|partnerISO|quantity |quantity_code|weight   |unit_value_usd     |usd_per_kg         |quarter|quarter_label|month_world_export_value|share_of_month_market|min_rate|max_rate|avg_rate         |
+--------+----+-----+---------

                                                                                

### `YEARLY TABLE`

In [40]:
print("\n=== JOIN COMTRADE YEARLY WITH CURRENCY AND WDI ===")

# join z df_dim_currency (country ISO → currency)
comtrade_with_currency = (
    df_yearly
    .join(
        df_dim_currency,
        df_yearly.partnerISO == df_dim_currency.country_iso3,
        how="left"
    )
    .drop(df_dim_currency.country_iso3)
)

print("After join with country info:")
print("Row count:", comtrade_with_currency.count())
print("Columns:", comtrade_with_currency.columns)
comtrade_with_currency.select("partnerISO", "currency").distinct().show(20, truncate=False)

# join z kursami walut rocznymi
print("\n=== JOIN WITH YEARLY CURRENCY RATES ===")

comtrade_with_rates = (
    comtrade_with_currency
    .join(
        currency_yearly,
        on=["currency", "year"],
        how="left"
    ).drop(currency_yearly.year)
)

print("After join with currency rates:")
print("Row count:", comtrade_with_rates.count())
currencies = ["EUR", "USD", "HUF", "CHF"]

comtrade_with_rates.filter(
    col("currency").isin(currencies)
).select(
    "currency", "year", "partnerISO", "avg_rate", "min_rate", "max_rate"
).show(10, truncate=False)

# join z danymi WDI
print("\n=== JOIN WITH WDI DATA ===")

comtrade_with_wdi = (
    comtrade_with_rates
    .join(
        final_wdi,
        (comtrade_with_rates.partnerISO == final_wdi.country_id) &
        (comtrade_with_rates.year == final_wdi.year),
        how="left"
    ).drop(final_wdi.year)
    
)

print("After join with WDI:")
print("Row count:", comtrade_with_wdi.count())
print("Columns:", comtrade_with_wdi.columns)

# filtr lat 2004–2023
comtrade_with_wdi = comtrade_with_wdi.filter((col("year") >= 2004) & (col("year") <= 2023))

print("\nAfter filtering years 2004-2023:")
print("Row count:", comtrade_with_wdi.count())
print("Years range:")
comtrade_with_wdi.selectExpr("min(year) as min_year", "max(year) as max_year").show()

# sanity check – kilka przykładowych wierszy
print("Sample rows:")
comtrade_with_wdi.orderBy("year").show(10, truncate=False)

print("\nSchema after all joins:")
comtrade_with_wdi.printSchema()

print("\nCreated yearly Comtrade with currency rates and WDI (2004-2023)")



=== JOIN COMTRADE YEARLY WITH CURRENCY AND WDI ===
After join with country info:


                                                                                

Row count: 19597
Columns: ['year', 'partnerISO', 'commodity_desc', 'hs_code', 'annual_value_usd', 'quantity', 'weight', 'unit_value_usd', 'usd_per_kg', 'world_export_value', 'share_of_year_market', 'currency']


                                                                                

+----------+--------+
|partnerISO|currency|
+----------+--------+
|NIU       |null    |
|HTI       |HTG     |
|PSE       |ILS     |
|BRB       |BBD     |
|LVA       |EUR     |
|JAM       |JMD     |
|ZMB       |ZMW     |
|BRA       |BRL     |
|ARM       |AMD     |
|MOZ       |MZN     |
|CUB       |CUP     |
|JOR       |JOD     |
|ABW       |AWG     |
|FRA       |EUR     |
|SOM       |SOS     |
|BRN       |null    |
|COD       |CDF     |
|BOL       |BOB     |
|URY       |UYU     |
|GIB       |GIP     |
+----------+--------+
only showing top 20 rows


=== JOIN WITH YEARLY CURRENCY RATES ===
After join with currency rates:


                                                                                

Row count: 19597


                                                                                

+--------+----+----------+------------------+--------+--------+
|currency|year|partnerISO|avg_rate          |min_rate|max_rate|
+--------+----+----------+------------------+--------+--------+
|EUR     |2019|FIN       |4.298836653386455 |4.2406  |4.3891  |
|EUR     |2011|FIN       |4.11959484126984  |3.8403  |4.5642  |
|EUR     |2008|FIN       |3.5128791338582666|3.2026  |4.1848  |
|EUR     |2019|FIN       |4.298836653386455 |4.2406  |4.3891  |
|EUR     |2024|FIN       |4.3065317460317445|4.2499  |4.4016  |
|EUR     |2022|FIN       |4.68759880952381  |4.4879  |4.9647  |
|EUR     |2009|FIN       |4.328159215686276 |3.917   |4.8999  |
|EUR     |2014|FIN       |4.1844944444444465|4.0998  |4.3138  |
|EUR     |2020|FIN       |4.444947058823527 |4.2279  |4.633   |
|EUR     |2013|FIN       |4.197641832669325 |4.0671  |4.3432  |
+--------+----+----------+------------------+--------+--------+
only showing top 10 rows


=== JOIN WITH WDI DATA ===
After join with WDI:


                                                                                

Row count: 19597
Columns: ['currency', 'year', 'partnerISO', 'commodity_desc', 'hs_code', 'annual_value_usd', 'quantity', 'weight', 'unit_value_usd', 'usd_per_kg', 'world_export_value', 'share_of_year_market', 'min_rate', 'max_rate', 'avg_rate', 'country_id', 'country_name', 'external_debt', 'gdp', 'import', 'industry_in_gdp', 'inflation', 'population', 'services_in_gdp', 'trade_in_gdp', 'gdp_per_capita']

After filtering years 2004-2023:


                                                                                

Row count: 17618
Years range:


                                                                                

+--------+--------+
|min_year|max_year|
+--------+--------+
|    2004|    2023|
+--------+--------+

Sample rows:




+--------+----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+----------------+----------+----------+------------------+------------------+------------------+---------------------+--------+--------+-----------------+----------+------------+-------------+----+------+---------------+---------+----------+---------------+------------+--------------+
|currency|year|partnerISO|commodity_desc                                                                                                                                                                                                               |hs_code|annual_value_usd|quantity  |weight    |unit_value_usd    |usd_per_kg        |world_export_value|share_of_year_market |min_rate|max_rate|avg_rate         |country_id|country_name|external_debt|gdp |impor

                                                                                

## Saving combined data in HDFS

In [48]:

print("\n=== FINAL CHECKS BEFORE SAVING ===")

# Comtrade yearly with WDI
print("\n--- Comtrade yearly with WDI ---")
print("Row count:", comtrade_with_wdi.count())
print("Columns:", comtrade_with_wdi.columns)
print("Years range:")
comtrade_with_wdi.selectExpr("min(year) as min_year", "max(year) as max_year").show()
print("Sample rows:")
comtrade_with_wdi.show(10, truncate=False)

# Comtrade monthly / currency
print("\n--- Comtrade monthly with currency ---")
comtrade_with_currency = montly_agg  # zakładam, że masz monthly aggregated DataFrame
print("Row count:", comtrade_with_currency.count())
print("Columns:", comtrade_with_currency.columns)
print("Years range:")
comtrade_with_currency.selectExpr("min(year) as min_year", "max(year) as max_year").show()
print("Sample rows:")
comtrade_with_currency.show(10, truncate=False)

# Zapis do Parquet na HDFS
# Ścieżki HDFS

print("\n=== SAVING DATA INTO HDFS ===")
final_path = "hdfs:///user/vagrant/project/final_tables/"
tmp_wdi = final_path + "tmp_wdi"
tmp_currency = final_path + "tmp_currency"

# Zapis tymczasowy z jedną partycją
comtrade_with_wdi.coalesce(1).write.mode("overwrite").parquet(tmp_wdi)
comtrade_with_currency.coalesce(1).write.mode("overwrite").parquet(tmp_currency)

# Sprawdzenie zawartości tmp folderów
!hdfs dfs -ls {tmp_wdi}
!hdfs dfs -ls {tmp_currency}

# Przeniesienie pliku part-0000… do docelowej nazwy
!hdfs dfs -mv {tmp_wdi}/part-*.parquet {final_path}comtrade_with_wdi.parquet
!hdfs dfs -mv {tmp_currency}/part-*.parquet {final_path}comtrade_with_currency.parquet

# Usunięcie tymczasowych folderów
!hdfs dfs -rm -r -f {tmp_wdi}
!hdfs dfs -rm -r -f {tmp_currency}

# 5️⃣ Sprawdzenie finalnych plików
!hdfs dfs -ls {final_path}

print("Finished: comtrade_with_wdi.parquet and comtrade_with_currency.parquet saved as single files")
spark.stop()




=== FINAL CHECKS BEFORE SAVING ===

--- Comtrade yearly with WDI ---


                                                                                

Row count: 17618
Columns: ['currency', 'year', 'partnerISO', 'commodity_desc', 'hs_code', 'annual_value_usd', 'quantity', 'weight', 'unit_value_usd', 'usd_per_kg', 'world_export_value', 'share_of_year_market', 'min_rate', 'max_rate', 'avg_rate', 'country_id', 'country_name', 'external_debt', 'gdp', 'import', 'industry_in_gdp', 'inflation', 'population', 'services_in_gdp', 'trade_in_gdp', 'gdp_per_capita']
Years range:


                                                                                

+--------+--------+
|min_year|max_year|
+--------+--------+
|    2004|    2023|
+--------+--------+

Sample rows:


                                                                                

+--------+----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-------+----------------+--------+-------+------------------+-------------------+--------------------+---------------------+--------+--------+------------------+----------+------------+-------------+----+------+---------------+---------+----------+---------------+------------+--------------+
|currency|year|partnerISO|commodity_desc                                                                                                                                             |hs_code|annual_value_usd|quantity|weight |unit_value_usd    |usd_per_kg         |world_export_value  |share_of_year_market |min_rate|max_rate|avg_rate          |country_id|country_name|external_debt|gdp |import|industry_in_gdp|inflation|population|services_in_gdp|trade_in_gdp|gdp_per_capita|
+--------+----+----------+------------------------

                                                                                

Row count: 129298
Columns: ['currency', 'year', 'month', 'commodity_desc', 'data_period', 'hs_code', 'primary_value_usd', 'partner_code', 'partnerISO', 'quantity', 'quantity_code', 'weight', 'unit_value_usd', 'usd_per_kg', 'quarter', 'quarter_label', 'month_world_export_value', 'share_of_month_market', 'min_rate', 'max_rate', 'avg_rate']
Years range:


                                                                                

+--------+--------+
|min_year|max_year|
+--------+--------+
|    2004|    2023|
+--------+--------+

Sample rows:


                                                                                

+--------+----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+-----------------+------------+----------+--------+-------------+-------+------------------+-------------------+-------+-------------+------------------------+---------------------+--------+--------+--------+
|currency|year|month|commodity_desc                                                                                                                                             |data_period|hs_code|primary_value_usd|partner_code|partnerISO|quantity|quantity_code|weight |unit_value_usd    |usd_per_kg         |quarter|quarter_label|month_world_export_value|share_of_month_market|min_rate|max_rate|avg_rate|
+--------+----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-----------

                                                                                

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/apache-tez-0.9.1-bin/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 2 items
-rw-r--r--   1 vagrant supergroup          0 2026-01-14 11:36 hdfs:///user/vagrant/project/final_tables/tmp_wdi/_SUCCESS
-rw-r--r--   1 vagrant supergroup     785403 2026-01-14 11:36 hdfs:///user/vagrant/project/final_tables/tmp_wdi/part-00000-97dc57c3-7f21-422e-96ce-4b253e56414a-c000.snappy.parquet
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.cla

## Views