In [30]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir",f"/user/itv010130/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [31]:
spark

### 1. Creating dataFrame for below data

In [4]:
weather = [
("Spring",12.3),
("Summer",10.5),
("Autumn",8.2),
("Winter",15.1),
]

In [5]:
weather_table = spark.createDataFrame(weather).toDF("season","windspeed")

In [6]:
weather_table.show()

+------+---------+
|season|windspeed|
+------+---------+
|Spring|     12.3|
|Summer|     10.5|
|Autumn|      8.2|
|Winter|     15.1|
+------+---------+



In [7]:
! hadoop fs -ls /public/****/datasets/library_data.json

-rw-r--r--   3 itv005857 supergroup        925 2023-05-23 13:05 /public/****/datasets/library_data.json


In [8]:
! hadoop fs -cat /public/****/datasets/library_data.json

{"library_name": "Central Library","location": "City Center","books": [{"book_id": "B001","book_name": "The Great Gatsby","author": "F. Scott Fitzgerald","copies_available": 5},{"book_id": "B002","book_name": "To Kill a Mockingbird","author": "Harper Lee","copies_available": 3}],"members": [{"member_id": "M001","member_name": "John Smith","age": 28,"books_borrowed": ["B001"]},{"member_id": "M002","member_name": "Emma Johnson","age": 35,"books_borrowed": []}]},
{"library_name": "Community Library","location": "Suburb","books": [{"book_id": "B003","book_name": "1984","author": "George Orwell","copies_available": 2},{"book_id": "B004","book_name": "Pride and Prejudice","author": "Jane Austen","copies_available": 4}],"members": [{"member_id": "M003","member_name": "Michael Brown","age": 42,"books_borrowed": ["B003","B004"]},{"member_id": "M004","member_name": "Sophia Davis","age": 31,"books_borrowed": ["B004"]}]}


### 2. Loading and enforcing scema using StructType

In [32]:
df = spark.read \
.format("json") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/****/datasets/library_data.json")

In [33]:
df.show()

+--------------------+-----------------+-----------+--------------------+
|               books|     library_name|   location|             members|
+--------------------+-----------------+-----------+--------------------+
|[{F. Scott Fitzge...|  Central Library|City Center|[{28, [B001], M00...|
|[{George Orwell, ...|Community Library|     Suburb|[{42, [B003, B004...|
+--------------------+-----------------+-----------+--------------------+



In [34]:
df.printSchema()

root
 |-- books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- book_id: string (nullable = true)
 |    |    |-- book_name: string (nullable = true)
 |    |    |-- copies_available: long (nullable = true)
 |-- library_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- books_borrowed: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- member_id: string (nullable = true)
 |    |    |-- member_name: string (nullable = true)



In [35]:
from pyspark.sql.types import *

##### StructType

In [46]:
book_head = StructType([
StructField("books",ArrayType(StructType([StructField("author", StringType()),StructField("book_id",StringType()),StructField("book_name",StringType()),StructField("copies_available",LongType())]))),
StructField("library_name",StringType()),
StructField("location",StringType()),
StructField("members",ArrayType(StructType([StructField("age", LongType()),StructField("books_borrowed",ArrayType(StringType())),StructField("member_id",StringType()),StructField("member_name",StringType())]))),
])

In [47]:
book_df = spark.read.format("json").schema(book_head).load("/public/****/datasets/library_data.json")

In [48]:
book_df.show()

+--------------------+-----------------+-----------+--------------------+
|               books|     library_name|   location|             members|
+--------------------+-----------------+-----------+--------------------+
|[{F. Scott Fitzge...|  Central Library|City Center|[{28, [B001], M00...|
|[{George Orwell, ...|Community Library|     Suburb|[{42, [B003, B004...|
+--------------------+-----------------+-----------+--------------------+



In [49]:
book_df.printSchema()

root
 |-- books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- book_id: string (nullable = true)
 |    |    |-- book_name: string (nullable = true)
 |    |    |-- copies_available: long (nullable = true)
 |-- library_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- age: long (nullable = true)
 |    |    |-- books_borrowed: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- member_id: string (nullable = true)
 |    |    |-- member_name: string (nullable = true)



### 3a. Create dataframe and drop column passenger_name and age from below data

In [53]:
df = spark.read \
.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.load("/public/****/datasets/train.csv")

In [55]:
df.show(2)

+------------+----------+---------------+--------------+---+-------------+-----------+
|train_number|train_name|seats_available|passenger_name|age|ticket_number|seat_number|
+------------+----------+---------------+--------------+---+-------------+-----------+
|         123|   Express|            100|          John| 25|         T123|         A1|
|         123|   Express|            100|          Emma| 30|         T124|         B2|
+------------+----------+---------------+--------------+---+-------------+-----------+
only showing top 2 rows



In [56]:
df.printSchema()

root
 |-- train_number: integer (nullable = true)
 |-- train_name: string (nullable = true)
 |-- seats_available: integer (nullable = true)
 |-- passenger_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- ticket_number: string (nullable = true)
 |-- seat_number: string (nullable = true)



In [60]:
new_df1 = df.drop("passenger_name","age")

In [61]:
new_df1.show()

+------------+----------+---------------+-------------+-----------+
|train_number|train_name|seats_available|ticket_number|seat_number|
+------------+----------+---------------+-------------+-----------+
|         123|   Express|            100|         T123|         A1|
|         123|   Express|            100|         T124|         B2|
|         456| Superfast|            150|         T125|         C3|
|         456| Superfast|            150|         T126|         D4|
|         789|     Local|             50|         T127|         E5|
|         789|     Local|             50|         T128|         F6|
|         789|     Local|             50|         T129|         G7|
+------------+----------+---------------+-------------+-----------+



### 3b. Remove duplicates from column train_number and ticket_number

In [76]:
new_df2 = new_df1.dropDuplicates(["train_number","ticket_number"])

In [77]:
new_df2.show()

+------------+----------+---------------+-------------+-----------+
|train_number|train_name|seats_available|ticket_number|seat_number|
+------------+----------+---------------+-------------+-----------+
|         789|     Local|             50|         T128|         F6|
|         123|   Express|            100|         T124|         B2|
|         123|   Express|            100|         T123|         A1|
|         456| Superfast|            150|         T126|         D4|
|         456| Superfast|            150|         T125|         C3|
|         789|     Local|             50|         T127|         E5|
|         789|     Local|             50|         T129|         G7|
+------------+----------+---------------+-------------+-----------+



In [78]:
number_rows = new_df2.count()
print("Number of rows:",number_rows)

Number of rows: 7


### 3c. Number of unique train names

In [80]:
train_names = new_df1.dropDuplicates(["train_name"])
number_train_name = train_names.count()
print("Number of unique train names:",number_train_name)

Number of unique train names: 3
