In [1]:
from pyspark.sql import SparkSession , Row
from pyspark.sql import functions as F
from pyspark.ml import feature as MF


spark = SparkSession.builder \
    .appName("SparkReadOperations") \
    .config("spark.executor.cores", "12") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "12") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [12]:
df=spark.read.csv(r'C:\Users\MdMuntasirulHoque\Downloads\Code (1)\Code\class_5_Spark_ETL_Extension_Class\class_4\data\spaceship-titanic\train.csv', header=True, inferSchema=True)

In [13]:
df.show(2)

+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+-----+------+---------------+-----------+
|PassengerId|HomePlanet|CryoSleep|Cabin|Destination| Age|  VIP|RoomService|FoodCourt|ShoppingMall|  Spa|VRDeck|           Name|Transported|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+-----+------+---------------+-----------+
|    0001_01|    Europa|    false|B/0/P|TRAPPIST-1e|39.0|false|        0.0|      0.0|         0.0|  0.0|   0.0|Maham Ofracculy|      false|
|    0002_01|     Earth|    false|F/0/S|TRAPPIST-1e|24.0|false|      109.0|      9.0|        25.0|549.0|  44.0|   Juanna Vines|       true|
+-----------+----------+---------+-----+-----------+----+-----+-----------+---------+------------+-----+------+---------------+-----------+
only showing top 2 rows



In [14]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- HomePlanet: string (nullable = true)
 |-- CryoSleep: boolean (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- VIP: boolean (nullable = true)
 |-- RoomService: double (nullable = true)
 |-- FoodCourt: double (nullable = true)
 |-- ShoppingMall: double (nullable = true)
 |-- Spa: double (nullable = true)
 |-- VRDeck: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Transported: boolean (nullable = true)



In [15]:
df=df.withColumn(
    "Deck",
    F.split("Cabin","/")[0]
).withColumn(
    "Cabin_num",
    F.split("Cabin","/")[1]
).withColumn(
    "Side",
    F.split("Cabin","/")[2]
).drop(

    "Cabin"
).withColumn(   
    
     "CryoSleep",
    F.col("CryoSleep").cast("int")
).withColumn(
    "VIP",
    F.col("VIP").cast("int")
)

In [16]:
df.withColumn(
    "null_count",
    sum([F.when(F.col(c).isNull(), 1).otherwise(0) for c in df.columns])
).groupBy(
    "null_count"
).count().sort("null_count").show()

+----------+-----+
|null_count|count|
+----------+-----+
|         0| 6606|
|         1| 1709|
|         2|  167|
|         3|  170|
|         4|   36|
|         5|    5|
+----------+-----+



In [20]:
def get_shape(df):
    return (df.count(), len(df.columns))
shape = get_shape(df)
print(f"Shape: {shape}")

Shape: (8693, 16)


In [62]:
df=spark.read.csv(r"C:\Users\MdMuntasirulHoque\Downloads\Code (1)\Code\class_5_Spark_ETL_Extension_Class\class_4\data\201508_trip_data.csv", header=True, inferSchema=True)

In [63]:
df.printSchema()

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [64]:
df.show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

In [65]:
df=df.withColumn(
    "Start Date",
    F.to_date("Start Date", "M/d/yyyy HH:mm")

).withColumn(
    "End Date",
    F.to_date("End Date","M/d/yyyy HH:mm")
)

In [66]:
df.show(100)

+-------+--------+----------+--------------------+--------------+----------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|Start Date|       Start Station|Start Terminal|  End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+----------+--------------------+--------------+----------+--------------------+------------+------+---------------+--------+
| 913460|     765|2015-08-31|Harry Bridges Pla...|            50|2015-08-31|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|2015-08-31|San Antonio Shopp...|            31|2015-08-31|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|2015-08-31|      Post at Kearny|            47|2015-08-31|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|2015-08-31|  San Jose City Hall|            10|2015-08-31| San Salvador at 1st|           8|    68|     Subscriber|   95113|

In [67]:
df=df.withColumn(
    "Start Date",
    F.to_date("Start Date", "M/d/yyyy HH:mm")
).withColumn(
        "End Date",
        F.to_date("Start Date", "M/d/yyyy HH:mm")
)

In [68]:
df=df.withColumn(
"Days Diff",
F.date_diff("Start Date","End Date")
)

In [69]:
df.show(2)

+-------+--------+----------+--------------------+--------------+----------+--------------------+------------+------+---------------+--------+---------+
|Trip ID|Duration|Start Date|       Start Station|Start Terminal|  End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|Days Diff|
+-------+--------+----------+--------------------+--------------+----------+--------------------+------------+------+---------------+--------+---------+
| 913460|     765|2015-08-31|Harry Bridges Pla...|            50|2015-08-31|San Francisco Cal...|          70|   288|     Subscriber|    2139|        0|
| 913459|    1036|2015-08-31|San Antonio Shopp...|            31|2015-08-31|Mountain View Cit...|          27|    35|     Subscriber|   95032|        0|
+-------+--------+----------+--------------------+--------------+----------+--------------------+------------+------+---------------+--------+---------+
only showing top 2 rows



In [71]:
df.withColumn(
    "Start Station",
    F.locate("Bridges", "Start Station")
).where(

    F.col("Start Station")!=0

).select("Start Station").distinct().show()

+-------------+
|Start Station|
+-------------+
|            7|
+-------------+



In [72]:
df.withColumn(
    "Start Station",
    F.locate("Bridges", "Start Station")
).where(

F.col("Start Station") !=0

).select("Start Station").distinct().show()

+-------------+
|Start Station|
+-------------+
|            7|
+-------------+



In [None]:
# df=df.withColumn("geohash5", substring(col("geohash"),1,5)
#                 ).withColumn("geohash4", substring(col("geohash"),1,4)
#                             ).withColumn("geohash3", substring(col("geohash"),1,3))

In [74]:
df = spark.read.csv(r'C:\Users\MdMuntasirulHoque\OneDrive - ADA Global\IMPORTANT FILE\POI DATA\IMPORTANT\BD_lookup_table.csv', header=True, inferSchema=True)

In [76]:
df.printSchema()

root
 |-- geohash: string (nullable = true)
 |-- country: string (nullable = true)
 |-- division: string (nullable = true)
 |-- district: string (nullable = true)
 |-- thana: string (nullable = true)
 |-- union: string (nullable = true)



In [78]:
df.select("division").distinct().show()

+----------+
|  division|
+----------+
|     Dhaka|
|    Khulna|
|  Rajshahi|
|   Barisal|
|Chittagong|
|    Sylhet|
|Mymensingh|
|   Rangpur|
+----------+



In [80]:
from pyspark.sql.functions import col, substring
df=df.withColumn("geohash5", substring(col("geohash"),1,5)
                ).withColumn("geohash4", substring(col("geohash"),1,4)
                            ).withColumn("geohash3", substring(col("geohash"),1,3))

In [81]:
df.show()

+-------+----------+--------+--------+----------+---------------+--------+--------+--------+
|geohash|   country|division|district|     thana|          union|geohash5|geohash4|geohash3|
+-------+----------+--------+--------+----------+---------------+--------+--------+--------+
| tgzq6c|Bangladesh|  Khulna|Satkhira|Shyamnagar| Satkhira Range|   tgzq6|    tgzq|     tgz|
| tgzqke|Bangladesh|  Khulna|Satkhira|Shyamnagar|   Ramjan Nagar|   tgzqk|    tgzq|     tgz|
| tgzqud|Bangladesh|  Khulna|Satkhira|Shyamnagar|   Ramjan Nagar|   tgzqu|    tgzq|     tgz|
| tgzr0w|Bangladesh|  Khulna|Satkhira|  Kaliganj|       Ratanpur|   tgzr0|    tgzr|     tgz|
| tgzr29|Bangladesh|  Khulna|Satkhira|  Kaliganj|      Dhalbaria|   tgzr2|    tgzr|     tgz|
| tgzrfd|Bangladesh|  Khulna|Satkhira|  Kaliganj|Dakshin Sreepur|   tgzrf|    tgzr|     tgz|
| tgzrne|Bangladesh|  Khulna|Satkhira|Shyamnagar|         Atulia|   tgzrn|    tgzr|     tgz|
| tgzrny|Bangladesh|  Khulna|Satkhira|Shyamnagar|         Atulia|   tg

In [86]:
# List of divisions to create indicator columns for
divisions = ["Dhaka", "Chittagong", "Khulna", "Rajshahi", "Barisal", "Sylhet", "Mymensingh", "Rangpur"]

# Add indicator columns dynamically
for div in divisions:
    df = df.withColumn(div, when(col("division") == div, lit(1)).otherwise(lit(0)))


In [87]:
df.show()

+-------+----------+--------+--------+----------+---------------+--------+--------+--------+-----+----------+------+--------+-------+------+----------+-------+
|geohash|   country|division|district|     thana|          union|geohash5|geohash4|geohash3|Dhaka|Chittagong|Khulna|Rajshahi|Barisal|Sylhet|Mymensingh|Rangpur|
+-------+----------+--------+--------+----------+---------------+--------+--------+--------+-----+----------+------+--------+-------+------+----------+-------+
| tgzq6c|Bangladesh|  Khulna|Satkhira|Shyamnagar| Satkhira Range|   tgzq6|    tgzq|     tgz|    0|         0|     1|       0|      0|     0|         0|      0|
| tgzqke|Bangladesh|  Khulna|Satkhira|Shyamnagar|   Ramjan Nagar|   tgzqk|    tgzq|     tgz|    0|         0|     1|       0|      0|     0|         0|      0|
| tgzqud|Bangladesh|  Khulna|Satkhira|Shyamnagar|   Ramjan Nagar|   tgzqu|    tgzq|     tgz|    0|         0|     1|       0|      0|     0|         0|      0|
| tgzr0w|Bangladesh|  Khulna|Satkhira|  

In [None]:
output=final_df.groupBy('data_usage').agg(F.countDistinct('ifa').alias('count')).sort('count', ascending=False)
output.show(4, False)