# **Session - 5** :Spark Higher API
## **Agenda**
- Why DF(Higher level API) over RDD.
- Intro to DF and Sparksql
- Creating Dataframe
- Basic Operations
- Transformations
- Writing Dataframes
- Working with Spark sql,managed view/table, metadata
- Databricks in cloud


In [1]:
# Sparksession boilerplate code

from pyspark.sql import SparkSession
import getpass
import os

try:
    username = getpass.getuser()

    # Create a Spark session
    spark = SparkSession.builder \
        .appName("MySparkApp") \
        .config('spark.ui.port', '0') \
        .config("spark.sql.warehouse.dir", f"/user/tplbigdattrain/datasets/dsml-8/Spark-session/Spark-DF-API/warehouse") \
        .enableHiveSupport() \
        .master('yarn') \
        .getOrCreate()

    # Log the Spark configuration to help with debugging
    print("Spark Session created successfully.")



except Exception as e:
    print("An error occurred while creating the Spark session:")
    print(str(e))


2024-09-08 06:08:20,765 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-09-08 06:08:22,220 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Spark Session created successfully.


In [2]:
spark

## Why DF(Higher level API) over RDD?
1. Schema awareness
2. Optimized
3. Easy to use.


## **DataFrames**

### Introduction
DataFrames in Spark are distributed collections of data organized into named columns, similar to tables in relational databases. They provide a higher-level API for manipulating structured data and are integrated with Spark’s Catalyst optimizer and Tungsten execution engine, offering significant performance benefits.

#### Key Features

1. **Schema**: DataFrames have a schema, which defines the names and types of the columns.
2. **API**: They offer a rich API for performing operations such as filtering, grouping, aggregating, and joining.
3. **Optimization**: DataFrames benefit from Spark’s Catalyst optimizer for query optimization and Tungsten execution engine for efficient computation.
4. **Interoperability**: They can be created from various data sources like JSON, CSV, Parquet, Avro, Hiv
### Note: Dataframes -> Temporary Nature
- DataFrames are temporary and exist only for the duration of the Spark session. Once the application is closed, the DataFrame will no longer be available.


---

## **Spark SQL**

### Introduction
Spark SQL allows users to query structured data using SQL syntax. It is tightly integrated with the Spark DataFrame API, enabling users to leverage SQL queries for data processing.

### Features
- **SQL Queries**: Perform operations using familiar SQL syntax.
- **Temporary Views**: Create temporary views from DataFrames to run SQL queries.
- **Optimization**: Benefit from Spark’s query optimization mechanisms.

### Example
1. **Creating a Temporary View**: Create a temporary view from a DataFrame.

   ```ddf.createOrReplaceTempView("temp_view_name")```


2. **Running SQL Queries**: Execute SQL queries on the temporary view.

   ```spark.sql("SELECT * FROM temp_view_name WHERE age > 21").show()```

### Note: SQL tables -> Persistent Tables
- Spark tables are persistent and accessible across different sessions. They are stored in a metastore, making them permanent and reusable.

## Various ways to create Dataframe

## 1. DF from txt files

dsml-8/students_data.txt - not usable
- Compltiable file formates
1. csv
2. parquet
2. 

### RDD to DF

In [3]:
sc = spark.sparkContext
path='dsml-8/students_data.txt'
students_data_rdd = sc.textFile(path)
students_data_rdd.collect()

                                                                                

['101 A Rohit Gurgaon 65 77 43 66 87',
 '102 B Akansha Delhi 55 46 24 66 77',
 '103 A Himanshu Faridabad 75 38 84 38 58',
 '104 A Ekta Delhi 85 84 39 58 85',
 '105 B Deepanshu Gurgaon 34 55 56 23 66',
 '106 B Ayush Delhi 66 62 98 74 87',
 '107 B Aditi Delhi 76 83 75 38 58',
 '108 A Sahil Faridabad 55 32 43 56 66',
 '109 A Krati Delhi 34 53 25 67 75']

In [7]:
std_rdd_map = students_data_rdd.map(lambda x:x.split(' '))
std_rdd_map.collect()

[['101', 'A', 'Rohit', 'Gurgaon', '65', '77', '43', '66', '87'],
 ['102', 'B', 'Akansha', 'Delhi', '55', '46', '24', '66', '77'],
 ['103', 'A', 'Himanshu', 'Faridabad', '75', '38', '84', '38', '58'],
 ['104', 'A', 'Ekta', 'Delhi', '85', '84', '39', '58', '85'],
 ['105', 'B', 'Deepanshu', 'Gurgaon', '34', '55', '56', '23', '66'],
 ['106', 'B', 'Ayush', 'Delhi', '66', '62', '98', '74', '87'],
 ['107', 'B', 'Aditi', 'Delhi', '76', '83', '75', '38', '58'],
 ['108', 'A', 'Sahil', 'Faridabad', '55', '32', '43', '56', '66'],
 ['109', 'A', 'Krati', 'Delhi', '34', '53', '25', '67', '75']]

In [8]:
std_df = std_rdd_map.toDF(['rollno','section','name','city','s1','s2','s3','s4','s5'])

In [9]:
std_df.show()

+------+-------+---------+---------+---+---+---+---+---+
|rollno|section|     name|     city| s1| s2| s3| s4| s5|
+------+-------+---------+---------+---+---+---+---+---+
|   101|      A|    Rohit|  Gurgaon| 65| 77| 43| 66| 87|
|   102|      B|  Akansha|    Delhi| 55| 46| 24| 66| 77|
|   103|      A| Himanshu|Faridabad| 75| 38| 84| 38| 58|
|   104|      A|     Ekta|    Delhi| 85| 84| 39| 58| 85|
|   105|      B|Deepanshu|  Gurgaon| 34| 55| 56| 23| 66|
|   106|      B|    Ayush|    Delhi| 66| 62| 98| 74| 87|
|   107|      B|    Aditi|    Delhi| 76| 83| 75| 38| 58|
|   108|      A|    Sahil|Faridabad| 55| 32| 43| 56| 66|
|   109|      A|    Krati|    Delhi| 34| 53| 25| 67| 75|
+------+-------+---------+---------+---+---+---+---+---+



In [10]:
std_df.printSchema()

root
 |-- rollno: string (nullable = true)
 |-- section: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- s1: string (nullable = true)
 |-- s2: string (nullable = true)
 |-- s3: string (nullable = true)
 |-- s4: string (nullable = true)
 |-- s5: string (nullable = true)



## 2. DF from collection

In [12]:
# Collection
sample_data = [
    (101, "A", "Rohit",    "Gurugram"),
    (102, "B", "Akansha",  "Delhi"),
    (103, "A", "Himanshu", "Faridabad"),
    (104, "A", "Ekta",     "Delhi"),
    (105, "B", "Ayush",    "Delhi")
]

In [14]:
df_from_coll = spark.createDataFrame(data=sample_data,schema=['rollno','section','name','city'])
df_from_coll.show()
df_from_coll.printSchema()

+------+-------+--------+---------+
|rollno|section|    name|     city|
+------+-------+--------+---------+
|   101|      A|   Rohit| Gurugram|
|   102|      B| Akansha|    Delhi|
|   103|      A|Himanshu|Faridabad|
|   104|      A|    Ekta|    Delhi|
|   105|      B|   Ayush|    Delhi|
+------+-------+--------+---------+

root
 |-- rollno: long (nullable = true)
 |-- section: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



# Spark DataFrame from any file

In [16]:
path = 'dsml-8/module_8_students_data.csv'
df_csv1 = spark.read.csv(path,header=True)
df_csv1.show()

+---+---+---------+---------+---+---+---+---+---+
|101|  A|    Rohit|  Gurgaon| 65| 77| 43| 66| 87|
+---+---+---------+---------+---+---+---+---+---+
|102|  B|  Akansha|    Delhi| 55| 46| 24| 66| 77|
|103|  A| Himanshu|Faridabad| 75| 38| 84| 38| 58|
|104|  A|     Ekta|    Delhi| 85| 84| 39| 58| 85|
|105|  B|Deepanshu|  Gurgaon| 34| 55| 56| 23| 66|
|106|  B|    Ayush|    Delhi| 66| 62| 98| 74| 87|
|107|  B|    Aditi|    Delhi| 76| 83| 75| 38| 58|
|108|  A|    Sahil|Faridabad| 55| 32| 43| 56| 66|
|109|  A|    Krati|    Delhi| 34| 53| 25| 67| 75|
+---+---+---------+---------+---+---+---+---+---+



In [17]:
df_csv2 = spark.read.format("csv").option('header',True).option('inferSchema',True).load(path)
df_csv2.show()

[Stage 13:>                                                         (0 + 1) / 1]

+---+---+---------+---------+---+---+---+---+---+
|101|  A|    Rohit|  Gurgaon| 65| 77| 43| 66| 87|
+---+---+---------+---------+---+---+---+---+---+
|102|  B|  Akansha|    Delhi| 55| 46| 24| 66| 77|
|103|  A| Himanshu|Faridabad| 75| 38| 84| 38| 58|
|104|  A|     Ekta|    Delhi| 85| 84| 39| 58| 85|
|105|  B|Deepanshu|  Gurgaon| 34| 55| 56| 23| 66|
|106|  B|    Ayush|    Delhi| 66| 62| 98| 74| 87|
|107|  B|    Aditi|    Delhi| 76| 83| 75| 38| 58|
|108|  A|    Sahil|Faridabad| 55| 32| 43| 56| 66|
|109|  A|    Krati|    Delhi| 34| 53| 25| 67| 75|
+---+---+---------+---------+---+---+---+---+---+



                                                                                

## Schema Enforcement
### 2 Ways:
1. Explicit Schema enforcement.(structField, structType. datatype)
2. Implicit schema enforcement.(inferSchema) with Renaming the columns

#### Various datatype:
1. IntegerType
2. StringType
3. Datetime
4. longtype

In [18]:
import pyspark.sql.types as tp

In [19]:
my_schema = tp.StructType([
    tp.StructField(name= "roll_no", dataType= tp.IntegerType()),
    tp.StructField(name= "section", dataType= tp.StringType()),
    tp.StructField(name= "name",    dataType= tp.StringType()),
    tp.StructField(name= "city",    dataType= tp.StringType()),
    tp.StructField(name= "subject1",dataType= tp.IntegerType()),
    tp.StructField(name= "subject2",dataType= tp.IntegerType()),
    tp.StructField(name= "subject3",dataType= tp.IntegerType()),
    tp.StructField(name= "subject4",dataType= tp.IntegerType()),
    tp.StructField(name= "subject5",dataType= tp.IntegerType()),
])

In [22]:
df_csv2 = spark.read.csv(path,header=True,schema=my_schema)
df_csv2.show()
df_csv2.printSchema()

+-------+-------+---------+---------+--------+--------+--------+--------+--------+
|roll_no|section|     name|     city|subject1|subject2|subject3|subject4|subject5|
+-------+-------+---------+---------+--------+--------+--------+--------+--------+
|    102|      B|  Akansha|    Delhi|      55|      46|      24|      66|      77|
|    103|      A| Himanshu|Faridabad|      75|      38|      84|      38|      58|
|    104|      A|     Ekta|    Delhi|      85|      84|      39|      58|      85|
|    105|      B|Deepanshu|  Gurgaon|      34|      55|      56|      23|      66|
|    106|      B|    Ayush|    Delhi|      66|      62|      98|      74|      87|
|    107|      B|    Aditi|    Delhi|      76|      83|      75|      38|      58|
|    108|      A|    Sahil|Faridabad|      55|      32|      43|      56|      66|
|    109|      A|    Krati|    Delhi|      34|      53|      25|      67|      75|
+-------+-------+---------+---------+--------+--------+--------+--------+--------+

roo

In [23]:
df_csv21 = spark.read.format("csv").option('header',True).schema(my_schema).load(path)
df_csv21.show()
df_csv21.printSchema()

+-------+-------+---------+---------+--------+--------+--------+--------+--------+
|roll_no|section|     name|     city|subject1|subject2|subject3|subject4|subject5|
+-------+-------+---------+---------+--------+--------+--------+--------+--------+
|    102|      B|  Akansha|    Delhi|      55|      46|      24|      66|      77|
|    103|      A| Himanshu|Faridabad|      75|      38|      84|      38|      58|
|    104|      A|     Ekta|    Delhi|      85|      84|      39|      58|      85|
|    105|      B|Deepanshu|  Gurgaon|      34|      55|      56|      23|      66|
|    106|      B|    Ayush|    Delhi|      66|      62|      98|      74|      87|
|    107|      B|    Aditi|    Delhi|      76|      83|      75|      38|      58|
|    108|      A|    Sahil|Faridabad|      55|      32|      43|      56|      66|
|    109|      A|    Krati|    Delhi|      34|      53|      25|      67|      75|
+-------+-------+---------+---------+--------+--------+--------+--------+--------+

roo

---

<center><h1> DataFrames Operations </h1></center>

---



* 1. **Print Schema**
* 2. **Column Names**
* 3. **Check the Dimensions of the Data**
* 4. **Select Columns**
* 5. **Drop**
* 6. **Retrieve specific records**
* 7. **Add new columns**
* 8. **Sorting**
* 9. **GroupBy & Aggregation Functions**

---

We are going to use the Healthcare Analytics(**dsml-8/hospital_train.csv**) Data which has 18 different columns -

 - case_id
 - hospital_code
 - hospital_type_code
 - city_code_hospital
 - hospital_region_code
 - extra_room_available
 - department
 - ward_type
 - ward_facility_code
 - bed_grade
 - patient_id
 - city_code_patient
 - admission_type
 - severity_of_illness
 - visitors_with_patient
 - age
 - admission_deposit
 - stay



In [25]:
df=spark.read.format('csv').option('header',True).option('inferSchema',True).load('dsml-8/hospital_train.csv')
df.show(5)
df.printSchema()

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|  Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|  Age|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|      1|            8|                 c|                 3|                   Z|                                3|radiotherapy|        R|

In [56]:
df.minParition()

AttributeError: 'DataFrame' object has no attribute 'minParition'

In [26]:
df.columns

['case_id',
 'Hospital_code',
 'Hospital_type_code',
 'City_Code_Hospital',
 'Hospital_region_code',
 'Available Extra Rooms in Hospital',
 'Department',
 'Ward_Type',
 'Ward_Facility_Code',
 'Bed Grade',
 'patientid',
 'City_Code_Patient',
 'Type of Admission',
 'Severity of Illness',
 'Visitors with Patient',
 'Age',
 'Admission_Deposit',
 'Stay']

#### Check the Dimensions of the Data

In [27]:
(df.count(),len(df.columns))

(318438, 18)

#### Select Columns

In [28]:
sample_data = df.select('case_id','patientid','Age','Admission_Deposit','Stay')
sample_data.show()

+-------+---------+-----+-----------------+-----+
|case_id|patientid|  Age|Admission_Deposit| Stay|
+-------+---------+-----+-----------------+-----+
|      1|    31397|51-60|           4911.0| 0-10|
|      2|    31397|51-60|           5954.0|41-50|
|      3|    31397|51-60|           4745.0|31-40|
|      4|    31397|51-60|           7272.0|41-50|
|      5|    31397|51-60|           5558.0|41-50|
|      6|    31397|51-60|           4449.0|11-20|
|      7|    31397|51-60|           6167.0| 0-10|
|      8|    31397|51-60|           5571.0|41-50|
|      9|    31397|51-60|           7223.0|51-60|
|     10|    31397|51-60|           6056.0|31-40|
|     11|    31397|51-60|           5797.0|21-30|
|     12|    31397|51-60|           5993.0|11-20|
|     13|    31397|51-60|           5141.0| 0-10|
|     14|    31397|51-60|           8477.0|21-30|
|     15|    63418|71-80|           2685.0| 0-10|
|     16|    63418|71-80|           9398.0| 0-10|
|     17|    63418|71-80|           2933.0| 0-10|


### Creating Spark SQl table from Dataframe

In [31]:
sample_data.createOrReplaceTempView('xyz')# TempView(view or table)

In [32]:
spark.sql('select * from xyz where Admission_Deposit > 3000 limit 5').show()

+-------+---------+-----+-----------------+-----+
|case_id|patientid|  Age|Admission_Deposit| Stay|
+-------+---------+-----+-----------------+-----+
|      1|    31397|51-60|           4911.0| 0-10|
|      2|    31397|51-60|           5954.0|41-50|
|      3|    31397|51-60|           4745.0|31-40|
|      4|    31397|51-60|           7272.0|41-50|
|      5|    31397|51-60|           5558.0|41-50|
+-------+---------+-----+-----------------+-----+



In [33]:
spark.sql('describe table xyz').show()

2024-09-08 04:56:26,925 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-09-08 04:56:26,925 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-09-08 04:56:32,181 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2024-09-08 04:56:32,181 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore tplbigdattrain@172.31.16.182
2024-09-08 04:56:32,199 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException


+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|          case_id|      int|   null|
|        patientid|      int|   null|
|              Age|   string|   null|
|Admission_Deposit|   double|   null|
|             Stay|   string|   null|
+-----------------+---------+-------+



2024-09-08 04:56:32,877 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [37]:
spark.sql('describe formatted xyz').show()

+-----------------+---------+-------+
|         col_name|data_type|comment|
+-----------------+---------+-------+
|          case_id|      int|   null|
|        patientid|      int|   null|
|              Age|   string|   null|
|Admission_Deposit|   double|   null|
|             Stay|   string|   null|
+-----------------+---------+-------+



## Drop column

In [38]:
df.show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|  Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|  Age|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|      1|            8|                 c|                 3|                   Z|                                3|radiotherapy|        R|

In [39]:
df_without_age = df.drop('age')
df_without_age.show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|  Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----------------+-----+
|      1|            8|                 c|                 3|                   Z|                                3|radiotherapy|        R|                 F

In [40]:
df_drop_multi = df.drop(*['age','department','Ward_Type'])
df_drop_multi.show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----------------+-----+
|      1|            8|                 c|                 3|                   Z|                                3|                 F|      2.0|    31397|              7.0|        Emergency|            Extreme|               

### Single Filtering:where()

In [42]:
df.where(df.Hospital_code == 8).show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|  Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|  Age|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|      1|            8|                 c|                 3|                   Z|                                3|radiotherapy|        R|

### Multiple Filtering:where()

In [44]:
df.where((df.Hospital_code == 8) & (df.Department=='gynecology')).show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+----------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|  Age|Admission_Deposit| Stay|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+----------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+
|    179|            8|                 c|                 3|                   Z|                                7|gynecology|        Q|        

### new column :  .withColumn()
- new column or transform exsisting one.

#### New Column

In [46]:
from pyspark.sql.functions import col
df_new = df.withColumn('deposite_per_vistior',col('Admission_Deposit')/col('Visitors with Patient'))
df_new.show(5)

+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+--------------------+
|case_id|Hospital_code|Hospital_type_code|City_Code_Hospital|Hospital_region_code|Available Extra Rooms in Hospital|  Department|Ward_Type|Ward_Facility_Code|Bed Grade|patientid|City_Code_Patient|Type of Admission|Severity of Illness|Visitors with Patient|  Age|Admission_Deposit| Stay|deposite_per_vistior|
+-------+-------------+------------------+------------------+--------------------+---------------------------------+------------+---------+------------------+---------+---------+-----------------+-----------------+-------------------+---------------------+-----+-----------------+-----+--------------------+
|      1|            8|                 c|                 3|               

### Sorting

In [47]:
sorted_df = df.sort(col('Hospital_code').asc(),col('Age').desc())
sorted_df.select('Hospital_code','Age').show()

+-------------+------+
|Hospital_code|   Age|
+-------------+------+
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1|91-100|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
|            1| 81-90|
+-------------+------+
only showing top 20 rows



### Groupby and agg function

#### .withColumn- manipulating with values inside tbale
#### .withRenamedColumn - change column name

In [52]:
group_df = df.groupby('Hospital_code') \
    .agg({'Admission_Deposit':'avg'}) \
    .withColumnRenamed('avg(Admission_Deposit)','Avg_admission_deposit') \
    .sort(col('Hospital_code').asc(),col('Avg_admission_deposit').desc())
group_df.show()

+-------------+---------------------+
|Hospital_code|Avg_admission_deposit|
+-------------+---------------------+
|            1|    5080.219089350352|
|            2|   5000.1293610348885|
|            3|    4829.677206295672|
|            4|    5496.309677419355|
|            5|    4935.027941455997|
|            6|    4584.582815177479|
|            7|    5608.380551301684|
|            8|    4933.259623259623|
|            9|    4853.857167680278|
|           10|    4555.867090620031|
|           11|    4926.203716528163|
|           12|    4779.087191289723|
|           13|    5118.668449197861|
|           14|     4820.88758079409|
|           15|   5333.8652911310355|
|           16|     5229.39171887769|
|           17|    4933.987820396292|
|           18|    4548.159504132232|
|           19|    4492.418728498044|
|           20|    5190.156583629893|
+-------------+---------------------+
only showing top 20 rows



In [53]:
group_df.write.csv('dsml-8/output/heath_group.csv',header=True,mode='overwrite')

                                                                                

In [58]:
group_df.write.parquet('dsml-8/o',mode='overwrite')utput/parquet/

                                                                                

In [54]:
df_load = spark.read.csv('dsml-8/output/heath_group.csv',header=True)
df_load.show(5)

+-------------+---------------------+
|Hospital_code|Avg_admission_deposit|
+-------------+---------------------+
|           15|   5333.8652911310355|
|           22|   4942.3895253682485|
|            2|   5000.1293610348885|
|           10|    4555.867090620031|
|           11|    4926.203716528163|
+-------------+---------------------+
only showing top 5 rows



In [61]:
spark.read.format('parquet').option('header',True).load('dsml-8/output/parquet/part-00000-a30133ef-e6b6-4a93-9351-8269dee70261-c000.snappy.parquet').show()

+-------------+---------------------+
|Hospital_code|Avg_admission_deposit|
+-------------+---------------------+
|            1|    5080.219089350352|
+-------------+---------------------+



# SparkSQL 
### Creating Database and table using Temporary View or Table

In [2]:
schema_2 = 'id integer, order_date string, customer_id integer, order_status string'

In [3]:
df2= spark.read.format('csv').option('header','true').schema(schema_2).load('dsml-8/orders_1gb.csv')
df2.show(5)
df2.printSchema()

[Stage 0:>                                                          (0 + 1) / 1]

+---+--------------------+-----------+---------------+
| id|          order_date|customer_id|   order_status|
+---+--------------------+-----------+---------------+
|  2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:...|      12111|       COMPLETE|
|  4|2013-07-25 00:00:...|       8827|         CLOSED|
|  5|2013-07-25 00:00:...|      11318|       COMPLETE|
|  6|2013-07-25 00:00:...|       7130|       COMPLETE|
+---+--------------------+-----------+---------------+
only showing top 5 rows

root
 |-- id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



                                                                                

In [4]:
from pyspark.sql.functions import to_timestamp

new_df = df2.withColumnRenamed('order_status','status').withColumn('order_date',to_timestamp('order_date'))
new_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)



In [5]:
#creating database
spark.sql('create database if not exists timepro_db')

2024-09-08 06:04:50,388 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-09-08 06:04:50,389 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-09-08 06:04:53,207 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2024-09-08 06:04:53,208 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore tplbigdattrain@172.31.16.182
2024-09-08 06:04:53,371 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
2024-09-08 06:04:53,377 ERROR metastore.RetryingHMSHandler: AlreadyExistsException(message:Database timepro_db already exists)
	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:925)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.re

DataFrame[]

In [5]:
spark.sql('show databases').show()

2024-09-08 06:08:43,999 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2024-09-08 06:08:43,999 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2024-09-08 06:08:45,752 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2024-09-08 06:08:45,752 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore tplbigdattrain@172.31.16.182


+----------+
| namespace|
+----------+
|   default|
|timepro_db|
+----------+



spark.sql('use default')

In [78]:
spark.sql('show tables').show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|        |patient_table|       true|
|        |          xyz|       true|
+--------+-------------+-----------+



In [6]:
spark.sql('use timepro_db')

2024-09-08 06:08:53,051 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


DataFrame[]

In [7]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [8]:
new_df.createOrReplaceTempView('orders1')

In [9]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |  orders1|       true|
+--------+---------+-----------+



In [11]:
spark.sql('select * from orders1 limit 5').show()

+---+-------------------+-----------+---------------+
| id|         order_date|customer_id|         status|
+---+-------------------+-----------+---------------+
|  2|2013-07-25 00:00:00|        256|PENDING_PAYMENT|
|  3|2013-07-25 00:00:00|      12111|       COMPLETE|
|  4|2013-07-25 00:00:00|       8827|         CLOSED|
|  5|2013-07-25 00:00:00|      11318|       COMPLETE|
|  6|2013-07-25 00:00:00|       7130|       COMPLETE|
+---+-------------------+-----------+---------------+



### describe Tablecre

### Dropping created table: