In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.StructType([
    T.StructField("dest", T.StringType()),
    T.StructField("origin", T.StringType()),
    T.StructField("flights", T.IntegerType())
])

df = (spark.read.option("header", True)
            .schema(schema)
            .csv("/FileStore/tables/2014_summary.csv"))

display(df.limit(10))



dest,origin,flights
United States,Saint Martin,1
United States,Romania,12
United States,Croatia,2
United States,Ireland,291
United States,India,62
Egypt,United States,11
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290


In [0]:
df = df.withColumn(
    "origin",
    F.when((F.col("dest").isin(["Costa Rica", "United States"])) & (F.col("flights") < 26) , F.lit(None)).otherwise(F.col("origin"))
)

display(df.limit(15))

dest,origin,flights
United States,,1
United States,,12
United States,,2
United States,Ireland,291
United States,India,62
Egypt,,11
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290


In [0]:
display(df.na.drop().limit(10))

dest,origin,flights
United States,Ireland,291
United States,India,62
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290
Guyana,United States,52
United States,Marshall Islands,35
Malta,United States,2
Malawi,United States,1


In [0]:
display(df.dropna().limit(10))

dest,origin,flights
United States,Ireland,291
United States,India,62
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290
Guyana,United States,52
United States,Marshall Islands,35
Malta,United States,2
Malawi,United States,1


In [0]:
display(df.fillna({"origin": "Dont know!!"}).limit(20))

dest,origin,flights
United States,Dont know!!,1
United States,Dont know!!,12
United States,Dont know!!,2
United States,Ireland,291
United States,India,62
Egypt,Dont know!!,11
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290


In [0]:


display(df.na.replace(["Gibraltar"], None).limit(10))
     


dest,origin,flights
United States,,1
United States,,12
United States,,2
United States,Ireland,291
United States,India,62
Egypt,,11
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290


In [0]:


display(df.replace(["United States", "Grenada"], "HAHHAHHAHAHA!").limit(10))
     


dest,origin,flights
HAHHAHHAHAHA!,,1
HAHHAHHAHAHA!,,12
HAHHAHHAHAHA!,,2
HAHHAHHAHAHA!,Ireland,291
HAHHAHHAHAHA!,India,62
Egypt,,11
HAHHAHHAHAHA!,HAHHAHHAHAHA!,47
Costa Rica,HAHHAHHAHAHA!,529
Senegal,HAHHAHHAHAHA!,35
HAHHAHHAHAHA!,Sint Maarten,290


In [0]:
display(df)

dest,origin,flights
United States,,1
United States,,12
United States,,2
United States,Ireland,291
United States,India,62
Egypt,,11
United States,Grenada,47
Costa Rica,United States,529
Senegal,United States,35
United States,Sint Maarten,290


In [0]:


df_t = df.withColumn(
    "random_id",
    F.array(F.rand(), F.rand())
)

display(df_t.limit(10))
     


dest,origin,flights,random_id
United States,,1,"List(0.3134637733888256, 0.9422142301844121)"
United States,,12,"List(0.7820705088730814, 0.7644455123283133)"
United States,,2,"List(0.619245151049554, 0.3913669294963834)"
United States,Ireland,291,"List(0.17523687526358134, 0.015299388620546694)"
United States,India,62,"List(0.5703592125997655, 0.43305257955516785)"
Egypt,,11,"List(0.7425546912906434, 0.1773088284833959)"
United States,Grenada,47,"List(0.578538529428439, 0.48192888800907074)"
Costa Rica,United States,529,"List(0.8704776205794721, 0.3762023015479772)"
Senegal,United States,35,"List(0.9798923715483692, 0.7886756741064661)"
United States,Sint Maarten,290,"List(0.19418382991287775, 0.6863130310088759)"


In [0]:
display(df_t.select("*", F.explode(F.col("random_id").alias("rid"))).drop("random_id").limit(10))

dest,origin,flights,col
United States,,1,0.3134637733888256
United States,,1,0.942214230184412
United States,,12,0.7820705088730814
United States,,12,0.7644455123283133
United States,,2,0.619245151049554
United States,,2,0.3913669294963834
United States,Ireland,291,0.1752368752635813
United States,Ireland,291,0.0152993886205466
United States,India,62,0.5703592125997655
United States,India,62,0.4330525795551678


In [0]:


df = df.withColumn(
    "dest_no_vowel",
    F.regexp_replace("dest", r"[bartekouiy]", "")
)

display(df.limit(10))
     


dest,origin,flights,dest_no_vowel
United States,,1,Und Ss
United States,,12,Und Ss
United States,,2,Und Ss
United States,Ireland,291,Und Ss
United States,India,62,Und Ss
Egypt,,11,Egp
United States,Grenada,47,Und Ss
Costa Rica,United States,529,Cs Rc
Senegal,United States,35,Sngl
United States,Sint Maarten,290,Und Ss


In [0]:
df = df.withColumn(
    "dest_last2_first2",
    F.regexp_extract("dest", r".{2}\ .{2}", 0)
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2
United States,,1,Und Ss,ed St
United States,,12,Und Ss,ed St
United States,,2,Und Ss,ed St
United States,Ireland,291,Und Ss,ed St
United States,India,62,Und Ss,ed St
Egypt,,11,Egp,
United States,Grenada,47,Und Ss,ed St
Costa Rica,United States,529,Cs Rc,ta Ri
Senegal,United States,35,Sngl,
United States,Sint Maarten,290,Und Ss,ed St


In [0]:
#ifnull nie dziala, bo za stara wersja clustra chyba, coalesce dziala wlasciwie prawie tak samo, wiec uzyje tego

df = df.withColumn(
    "temp_origin",
    F.coalesce(F.col("origin"), F.lit("Dont know!!"))
)

display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin
United States,,1,Und Ss,ed St,Dont know!!
United States,,12,Und Ss,ed St,Dont know!!
United States,,2,Und Ss,ed St,Dont know!!
United States,Ireland,291,Und Ss,ed St,Ireland
United States,India,62,Und Ss,ed St,India
Egypt,,11,Egp,,Dont know!!
United States,Grenada,47,Und Ss,ed St,Grenada
Costa Rica,United States,529,Cs Rc,ta Ri,United States
Senegal,United States,35,Sngl,,United States
United States,Sint Maarten,290,Und Ss,ed St,Sint Maarten


In [0]:


display(df.withColumn("dest", F.regexp_replace("dest", "ed", "y")).limit(10))
     


dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin
Unity States,,1,Und Ss,ed St,Dont know!!
Unity States,,12,Und Ss,ed St,Dont know!!
Unity States,,2,Und Ss,ed St,Dont know!!
Unity States,Ireland,291,Und Ss,ed St,Ireland
Unity States,India,62,Und Ss,ed St,India
Egypt,,11,Egp,,Dont know!!
Unity States,Grenada,47,Und Ss,ed St,Grenada
Costa Rica,United States,529,Cs Rc,ta Ri,United States
Senegal,United States,35,Sngl,,United States
Unity States,Sint Maarten,290,Und Ss,ed St,Sint Maarten


In [0]:
display(df.limit(10))

dest,origin,flights,dest_no_vowel,dest_last2_first2,temp_origin
United States,,1,Und Ss,ed St,Dont know!!
United States,,12,Und Ss,ed St,Dont know!!
United States,,2,Und Ss,ed St,Dont know!!
United States,Ireland,291,Und Ss,ed St,Ireland
United States,India,62,Und Ss,ed St,India
Egypt,,11,Egp,,Dont know!!
United States,Grenada,47,Und Ss,ed St,Grenada
Costa Rica,United States,529,Cs Rc,ta Ri,United States
Senegal,United States,35,Sngl,,United States
United States,Sint Maarten,290,Und Ss,ed St,Sint Maarten


In [0]:
agg_df = (
    df.groupBy("origin")
      .agg(
          F.count("flights").alias("num_records"),
          F.avg("flights").alias("avg_flights"),
          F.skewness("flights").alias("flights_skewness")
      )
      .sort(F.col("avg_flights").desc())
      .limit(10)
)

display(agg_df)

origin,num_records,avg_flights,flights_skewness
Canada,1,8177.0,
Mexico,1,6490.0,
United States,123,3235.8617886178863,10.940452773494435
United Kingdom,1,1812.0,
Japan,1,1501.0,
Germany,1,1343.0,
Dominican Republic,1,1282.0,
The Bahamas,1,991.0,
France,1,960.0,
Colombia,1,888.0,


In [0]:
display(df.groupby("origin")
          .agg(F.skewness("flights").alias("flights_skewness"))
          .sort(F.col("flights_skewness").desc())
          .limit(10))

origin,flights_skewness
United States,10.940452773494435
,0.3450098745745018
Russia,
Anguilla,
Senegal,
Sweden,
Kiribati,
Guyana,
Philippines,
Fiji,


In [0]:
simple_agg_df = (
    df.groupBy("origin")
      .agg(
          F.sum("flights").alias("total_flights"),
          F.max("flights").alias("max_flights")
      )
      .sort(F.col("total_flights").desc())
      .limit(10)
)

display(simple_agg_df)

origin,total_flights,max_flights
United States,398011,358354
Canada,8177,8177
Mexico,6490,6490
United Kingdom,1812,1812
Japan,1501,1501
Germany,1343,1343
Dominican Republic,1282,1282
The Bahamas,991,991
France,960,960
Colombia,888,888


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def is_prime(n):
    if n is None or n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

is_prime_udf = udf(is_prime, BooleanType())

df = df.withColumn("is_prime_flights", is_prime_udf("flights"))
display(df.select("origin", "dest", "flights", "is_prime_flights"))

origin,dest,flights,is_prime_flights
,United States,1,False
,United States,12,False
,United States,2,True
Ireland,United States,291,False
India,United States,62,False
,Egypt,11,True
Grenada,United States,47,True
United States,Costa Rica,529,False
United States,Senegal,35,False
Sint Maarten,United States,290,False


In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
import pandas as pd

@pandas_udf(IntegerType())
def count_vowels(col: pd.Series) -> pd.Series:
    return col.fillna("").str.lower().str.count(r"[aeiou]")

df = df.withColumn("origin_vowel_count", count_vowels("origin"))
display(df.select("origin", "origin_vowel_count"))



origin,origin_vowel_count
,0
,0
,0
Ireland,3
India,3
,0
Grenada,3
United States,5
United States,5
Sint Maarten,4
