In [0]:


from pyspark.sql.functions import (
    col, explode, regexp_replace, regexp_extract,
    coalesce, lit, stddev, when, array_contains, array, expr, avg, max, countDistinct, split, pandas_udf, udf
)

from pyspark.sql.types import(DoubleType, StringType)
import pandas as pd



In [0]:

df = spark.table("station_data_csv")
display(df.limit(20))
df.printSchema()

     

station_id,name,lat,long,dockcount,landmark,installation
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013
6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013
9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013
11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [0]:
df = df.fillna({"name": "Unknown Station"})
df = df.dropna(subset=["lat", "long"])



In [0]:
from pyspark.sql.functions import regexp_replace, regexp_extract

df = df.withColumn("clean_name", regexp_replace("name", "[^a-zA-Z0-9 ]", "")) \
       .withColumn("second_word", regexp_extract("name", r"^\w+\s+(\w+)", 1))
display(df.limit(20))
df.printSchema()

station_id,name,lat,long,dockcount,landmark,installation,clean_name,second_word
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,San Jose Diridon Caltrain Station,Jose
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,San Jose Civic Center,Jose
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013,Santa Clara at Almaden,Clara
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013,Adobe on Almaden,on
6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013,San Pedro Square,Pedro
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013,Paseo de San Antonio,de
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013,San Salvador at 1st,Salvador
9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013,Japantown,
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013,San Jose City Hall,Jose
11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013,MLK Library,Library


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)
 |-- clean_name: string (nullable = false)
 |-- second_word: string (nullable = false)



In [0]:
df = df.withColumn("name_words", split("name", " ")) \
    .withColumn("has_station", array_contains("name_words", "Station"))
display(df.limit(20))
df.printSchema()

station_id,name,lat,long,dockcount,landmark,installation,clean_name,second_word,name_words,has_station
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,San Jose Diridon Caltrain Station,Jose,"List(San, Jose, Diridon, Caltrain, Station)",True
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,San Jose Civic Center,Jose,"List(San, Jose, Civic, Center)",False
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013,Santa Clara at Almaden,Clara,"List(Santa, Clara, at, Almaden)",False
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013,Adobe on Almaden,on,"List(Adobe, on, Almaden)",False
6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013,San Pedro Square,Pedro,"List(San, Pedro, Square)",False
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013,Paseo de San Antonio,de,"List(Paseo, de, San, Antonio)",False
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013,San Salvador at 1st,Salvador,"List(San, Salvador, at, 1st)",False
9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013,Japantown,,List(Japantown),False
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013,San Jose City Hall,Jose,"List(San, Jose, City, Hall)",False
11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013,MLK Library,Library,"List(MLK, Library)",False


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)
 |-- clean_name: string (nullable = false)
 |-- second_word: string (nullable = false)
 |-- name_words: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- has_station: boolean (nullable = false)



In [0]:

df = df.withColumn("null_if_demo", expr("nullif(name, 'Unknown Station')")) \
       .withColumn("coalesced_name", coalesce("name", "second_word"))  

display(df.limit(20))
df.printSchema()


station_id,name,lat,long,dockcount,landmark,installation,clean_name,second_word,name_words,has_station,null_if_demo,coalesced_name
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,San Jose Diridon Caltrain Station,Jose,"List(San, Jose, Diridon, Caltrain, Station)",True,San Jose Diridon Caltrain Station,San Jose Diridon Caltrain Station
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,San Jose Civic Center,Jose,"List(San, Jose, Civic, Center)",False,San Jose Civic Center,San Jose Civic Center
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013,Santa Clara at Almaden,Clara,"List(Santa, Clara, at, Almaden)",False,Santa Clara at Almaden,Santa Clara at Almaden
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013,Adobe on Almaden,on,"List(Adobe, on, Almaden)",False,Adobe on Almaden,Adobe on Almaden
6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013,San Pedro Square,Pedro,"List(San, Pedro, Square)",False,San Pedro Square,San Pedro Square
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013,Paseo de San Antonio,de,"List(Paseo, de, San, Antonio)",False,Paseo de San Antonio,Paseo de San Antonio
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013,San Salvador at 1st,Salvador,"List(San, Salvador, at, 1st)",False,San Salvador at 1st,San Salvador at 1st
9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013,Japantown,,List(Japantown),False,Japantown,Japantown
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013,San Jose City Hall,Jose,"List(San, Jose, City, Hall)",False,San Jose City Hall,San Jose City Hall
11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013,MLK Library,Library,"List(MLK, Library)",False,MLK Library,MLK Library


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)
 |-- clean_name: string (nullable = false)
 |-- second_word: string (nullable = false)
 |-- name_words: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- has_station: boolean (nullable = false)
 |-- null_if_demo: string (nullable = true)
 |-- coalesced_name: string (nullable = false)



In [0]:
df.groupBy("landmark").agg(
    countDistinct("station_id").alias("unique_stations"),
    avg("lat").alias("avg_latitude"),
    stddev("long").alias("stddev_longitude")
).show()

display(df.limit(20))
df.printSchema()


+-------------+---------------+------------------+--------------------+
|     landmark|unique_stations|      avg_latitude|    stddev_longitude|
+-------------+---------------+------------------+--------------------+
|    Palo Alto|              5|       37.43837458|0.012413503945301538|
|San Francisco|             35| 37.78774634285713| 0.00828639823460809|
|     San Jose|             16| 37.33641356249999|0.007323157156125444|
| Redwood City|              7| 37.48628457142858|0.010656859653133309|
|Mountain View|              7|37.395347571428566| 0.01593977855047134|
+-------------+---------------+------------------+--------------------+



station_id,name,lat,long,dockcount,landmark,installation,clean_name,second_word,name_words,has_station,null_if_demo,coalesced_name
2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013,San Jose Diridon Caltrain Station,Jose,"List(San, Jose, Diridon, Caltrain, Station)",True,San Jose Diridon Caltrain Station,San Jose Diridon Caltrain Station
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013,San Jose Civic Center,Jose,"List(San, Jose, Civic, Center)",False,San Jose Civic Center,San Jose Civic Center
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013,Santa Clara at Almaden,Clara,"List(Santa, Clara, at, Almaden)",False,Santa Clara at Almaden,Santa Clara at Almaden
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013,Adobe on Almaden,on,"List(Adobe, on, Almaden)",False,Adobe on Almaden,Adobe on Almaden
6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013,San Pedro Square,Pedro,"List(San, Pedro, Square)",False,San Pedro Square,San Pedro Square
7,Paseo de San Antonio,37.333798,-121.886943,15,San Jose,8/7/2013,Paseo de San Antonio,de,"List(Paseo, de, San, Antonio)",False,Paseo de San Antonio,Paseo de San Antonio
8,San Salvador at 1st,37.330165,-121.885831,15,San Jose,8/5/2013,San Salvador at 1st,Salvador,"List(San, Salvador, at, 1st)",False,San Salvador at 1st,San Salvador at 1st
9,Japantown,37.348742,-121.894715,15,San Jose,8/5/2013,Japantown,,List(Japantown),False,Japantown,Japantown
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013,San Jose City Hall,Jose,"List(San, Jose, City, Hall)",False,San Jose City Hall,San Jose City Hall
11,MLK Library,37.335885,-121.88566,19,San Jose,8/6/2013,MLK Library,Library,"List(MLK, Library)",False,MLK Library,MLK Library


root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)
 |-- clean_name: string (nullable = false)
 |-- second_word: string (nullable = false)
 |-- name_words: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- has_station: boolean (nullable = false)
 |-- null_if_demo: string (nullable = true)
 |-- coalesced_name: string (nullable = false)



ZADANIE 2

In [0]:
def shift_lat(lat):
    return lat + 0.5 if lat is not None else None

shift_lat_udf = udf(shift_lat, DoubleType())
df = df.withColumn("shifted_latitude", shift_lat_udf("lat"))
df = df.withColumn("lat_double", col("lat").cast("double"))
df = df.withColumn("shifted_latitude", shift_lat_udf("lat_double"))
display(df.select("lat", "lat_double", "shifted_latitude"))


lat,lat_double,shifted_latitude
37.329732,37.329732,37.829732
37.330698,37.330698,37.830698
37.333988,37.333988,37.833988
37.331415,37.331415,37.831415
37.336721,37.336721,37.836721
37.333798,37.333798,37.833798
37.330165,37.330165,37.830165
37.348742,37.348742,37.848742
37.337391,37.337391,37.837391
37.335885,37.335885,37.835885


In [0]:

@pandas_udf(StringType())
def normalize_station_name(name: pd.Series) -> pd.Series:
    return name.str.lower().str.replace("san", "", regex=False).str.strip()

df = df.withColumn("normalized_name", normalize_station_name("name"))


display(df.select("name", "normalized_name"))


name,normalized_name
San Jose Diridon Caltrain Station,jose diridon caltrain station
San Jose Civic Center,jose civic center
Santa Clara at Almaden,ta clara at almaden
Adobe on Almaden,adobe on almaden
San Pedro Square,pedro square
Paseo de San Antonio,paseo de antonio
San Salvador at 1st,salvador at 1st
Japantown,japantown
San Jose City Hall,jose city hall
MLK Library,mlk library
