In [0]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import Row, SparkSession
spark = SparkSession.builder.appName("lab4_1").getOrCreate()

In [0]:
#data = pd.read_csv('https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/flight-data/csv/2015-summary.csv')
#data.head()
data_link = 'https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/refs/heads/master/data/flight-data/csv/2015-summary.csv'

df = pd.read_csv(data_link)
data = spark.createDataFrame(df)

data.limit(10).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [0]:
# isNull
display(data.select("*").where(col("count").isNull()))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count


In [0]:
new_rows = spark.createDataFrame([
    ("Poland", "Japan", None),
    ("China", "Germany", 10),
    ("Slovakia", "Norway", None)
], ["DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME", "count"])

In [0]:
data = data.unionByName(new_rows)
display(data.select("*").where(col("count").isNull()))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Poland,Japan,
Slovakia,Norway,


In [0]:
# fill
data = data.na.fill({"count": 0})
display(data.select("*").where(col("count").isNull()))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count


In [0]:
# explode
data_temp = spark.createDataFrame([
    ("USA", "China", 1000, ["JFK", "LAX"]),
    ("Germany", "France", 500, ["BER"]),
    ("Poland", None, None, None),
    (None, "Italy", 200, ["FCO", "LIN"]),
    ("UK", "Spain", None, [])
], ["DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME", "count", "airports"])

data_exploded = data_temp.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME", "count", explode("airports").alias("airport"))
data_exploded.show()

+-----------------+-------------------+-----+-------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|airport|
+-----------------+-------------------+-----+-------+
|              USA|              China| 1000|    JFK|
|              USA|              China| 1000|    LAX|
|          Germany|             France|  500|    BER|
|             null|              Italy|  200|    FCO|
|             null|              Italy|  200|    LIN|
+-----------------+-------------------+-----+-------+



In [0]:
# drop
data.drop("count").limit(10).show()

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
|            Egypt|      United States|
|    United States|              India|
|    United States|          Singapore|
|    United States|            Grenada|
|       Costa Rica|      United States|
|          Senegal|      United States|
|          Moldova|      United States|
+-----------------+-------------------+



In [0]:
# regexp_replace
data.select(regexp_replace("DEST_COUNTRY_NAME", "United States", "USA").alias("Destination")).limit(10).show()

+-----------+
|Destination|
+-----------+
|        USA|
|        USA|
|        USA|
|      Egypt|
|        USA|
|        USA|
|        USA|
| Costa Rica|
|    Senegal|
|    Moldova|
+-----------+



In [0]:
# regexp_extract
data.select(regexp_extract("DEST_COUNTRY_NAME", "Egypt", 0).alias("Destination")).limit(10).show()

+-----------+
|Destination|
+-----------+
|           |
|           |
|           |
|      Egypt|
|           |
|           |
|           |
|           |
|           |
|           |
+-----------+



In [0]:
# ifnull - zgapione od kolegi, kod taki jak w dokumentacji nie działa, nie ma funkcji ifnull
display(data_temp.withColumn("count", expr("ifnull(count, 0)")).limit(10))


DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,airports
USA,China,1000,"List(JFK, LAX)"
Germany,France,500,List(BER)
Poland,,0,
,Italy,200,"List(FCO, LIN)"
UK,Spain,0,List()


In [0]:
#nullif
display(data.withColumn("count", expr("nullif(count, 1)")).limit(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15.0
United States,Croatia,
United States,Ireland,344.0
Egypt,United States,15.0
United States,India,62.0
United States,Singapore,
United States,Grenada,62.0
Costa Rica,United States,588.0
Senegal,United States,40.0
Moldova,United States,


In [0]:
# replace
display(data.replace({"United States": "USA"}, subset=["DEST_COUNTRY_NAME"]).limit(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
USA,Romania,15
USA,Croatia,1
USA,Ireland,344
Egypt,United States,15
USA,India,62
USA,Singapore,1
USA,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
# array_contains
display(data_temp.select(array_contains(data_temp.airports, "JFK")).limit(10))

"array_contains(airports, JFK)"
True
False
""
False
False


In [0]:
#funkcje agregujące
# avg
display(data.select(avg("count")))


avg(count)
1750.2934362934363


In [0]:
# countDistinct
display(data.select(countDistinct("DEST_COUNTRY_NAME")))


count(DISTINCT DEST_COUNTRY_NAME)
133


In [0]:
# groupBy
display(data.groupBy("DEST_COUNTRY_NAME").agg(avg("count").alias("Średnia ilość przylotów")).limit(15))

DEST_COUNTRY_NAME,Średnia ilość przylotów
Anguilla,41.0
Senegal,40.0
Guyana,64.0
Turks and Caicos Islands,230.0
Algeria,4.0
United States,3290.816
Malta,1.0
Marshall Islands,42.0
Bolivia,30.0
Italy,382.0


Zadanie 2

In [0]:
# prosta funkcja dzieląca kolumnę przez 2
from pyspark.sql.types import IntegerType
def divide_flights(flight_count):
    if flight_count is None:
        return 0  # jesli Null
    return int(flight_count / 2)

divide_flights_udf = udf(divide_flights, IntegerType())


display(
    data.withColumn("divided_flights", divide_flights_udf(col("count")))
    .select("count", "divided_flights")
    .limit(10)
)

count,divided_flights
15,7
1,0
344,172
15,7
62,31
1,0
62,31
588,294
40,20
1,0


In [0]:
# Funkcja skracająca nazwy krajów do 3 pierwszych liter
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def shorten_country_name(country_series: pd.Series) -> pd.Series:
    return country_series.str[:3]

display(
    data.withColumn("country_code", shorten_country_name(col("DEST_COUNTRY_NAME")))
    .select("DEST_COUNTRY_NAME", "country_code")
    .limit(10)
)

DEST_COUNTRY_NAME,country_code
United States,Uni
United States,Uni
United States,Uni
Egypt,Egy
United States,Uni
United States,Uni
United States,Uni
Costa Rica,Cos
Senegal,Sen
Moldova,Mol
