#Spark Session

In [0]:
# #If you are using Jupiter or other framework then follow this
# from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#     .appName("My PySpark App") \
#     .getOrCreate()

But in Databricks "Sparksession builder is inbuilt and we can access using 'spark' variable

#Using Inbuilt Datasets

dbutils.fs.ls(): This is a Databricks utility command that lists the files in a specified directory on the DBFS. The DBFS is a distributed file system installed on Databricks clusters. It allows you to store data such as file dependencies for jobs or temporary data.

'/databricks-datasets': This is the directory on the DBFS that you want to list. /databricks-datasets is a special directory that contains sample datasets provided by Databricks.

In [0]:
dbutils.fs.ls("/databricks-datasets")

Out[32]: [FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1532468253000),
 FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1455043490000),
 FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/amazon/', name='amazon/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/asa/', name='asa/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/atlas_higgs/', name='atlas_higgs/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/bikeSharing/', na

In [0]:
%fs

In [0]:
%fs
ls databricks-datasets

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,0


In [0]:
%fs

ls databricks-datasets/wine-quality


path,name,size,modificationTime
dbfs:/databricks-datasets/wine-quality/README.md,README.md,1066,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-red.csv,winequality-red.csv,84199,1594262736000
dbfs:/databricks-datasets/wine-quality/winequality-white.csv,winequality-white.csv,264426,1594262736000


#READING CSV AS DATAFRAME

In [0]:
file_path="dbfs:/databricks-datasets/wine-quality/winequality-red.csv"
df=spark.read.csv(file_path,header=True,inferSchema=True, sep=";")

The file_path is set to "dbfs:/databricks-datasets/wine-quality/winequality-red.csv", which indicates the path to the CSV file in the Databricks File System (DBFS). This file seems to contain data about the quality of different red wines.

The spark.read.csv function is then used to read this CSV file into a PySpark DataFrame named df. The header=True argument specifies that the first row of the CSV file should be treated as the header (i.e., column names), inferSchema=True tells PySpark to automatically infer the data type of each column based on its content, and sep=";" indicates that the values in the CSV file are separated by semicolons.

In [0]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

The df.show(5) command in Databricks will display the first 5 rows of the DataFrame df in a tabular form for quick inspection or debugging.

#Importing Functions

In [0]:
from pyspark.sql.functions import * 

pyspark.sql.functions import * is importing all functions from the pyspark.sql.functions module in PySpark.

The pyspark.sql.functions module contains a collection of functions used for handling data in PySpark, including string manipulations, date manipulations, mathematical functions, and many more.

By importing everything with *, you can use these functions directly by their name in your code without needing to prefix them with pyspark.sql.functions. each time you want to use them.

#Checking for NULL in all columns of dataframe

Using PySpark

In [0]:
# Loop over columns in DataFrame and select rows where the column value is null, then show the result
for col in df.columns:
    df.select(isnull(df[col])).show()

+-----------------------+
|(fixed acidity IS NULL)|
+-----------------------+
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
|                  false|
+-----------------------+
only showing top 20 rows

+--------------------------+
|(volatile acidity IS NULL)|
+--------------------------+
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
|                     false|
| 

Using SQL Query

In [0]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("wine")

# Use SQL to check for null values
for col in df.columns:
    null_count = spark.sql(f"SELECT COUNT(*) as null_count FROM wine WHERE `{col}` IS NULL").collect()[0]["null_count"]
    print(f"The column '{col}' has {null_count} null values")

The column 'fixed acidity' has 0 null values
The column 'volatile acidity' has 0 null values
The column 'citric acid' has 0 null values
The column 'residual sugar' has 0 null values
The column 'chlorides' has 0 null values
The column 'free sulfur dioxide' has 0 null values
The column 'total sulfur dioxide' has 0 null values
The column 'density' has 0 null values
The column 'pH' has 0 null values
The column 'sulphates' has 0 null values
The column 'alcohol' has 0 null values
The column 'quality' has 0 null values


#Sorting

Using PySpak

In [0]:
# Sort DataFrame based on pH and residual sugar in descending order, then show the result
sorted_df = df.orderBy(desc("pH"), desc("residual sugar"))
sorted_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          5.4|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          5.0|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          4.6|            0.52|       0.15|           2.1|    0.054|                8.0|                65.0| 0.9934| 3.9|     0.56|   13.1|      4|
|          5.1|            0.47|       0.02|           1.3|    0.034|               18.0|           

Using SQL Query

In [0]:
# Use SQL to order by pH and residual sugar in descending order
sorted_df = spark.sql("SELECT * FROM wine ORDER BY pH DESC, `residual sugar` DESC")
sorted_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          5.4|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          5.0|            0.74|        0.0|           1.2|    0.041|               16.0|                46.0|0.99258|4.01|     0.59|   12.5|      6|
|          4.6|            0.52|       0.15|           2.1|    0.054|                8.0|                65.0| 0.9934| 3.9|     0.56|   13.1|      4|
|          5.1|            0.47|       0.02|           1.3|    0.034|               18.0|           

#Filtering

Using PySpark

In [0]:
# Filter DataFrame to include only rows where pH is between 3.4 and 3.6, inclusive, then show the result
filtered_df = df.filter((df["pH"] >= 3.4) & (df["pH"] <= 3.6))
filtered_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|            0.66|        0.0|           1.8|    0.075|               13.0|                40.0| 0.9978|3.51|     0.56|    9.4|      5|
|          5.6|           0.615|        0.0|           1.6|    0.089|               16.0|           

Using SQL Query

In [0]:
# Use SQL to filter by pH between 3.4 and 3.6
filtered_df = spark.sql("SELECT * FROM wine WHERE pH BETWEEN 3.4 AND 3.6")
filtered_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|            0.66|        0.0|           1.8|    0.075|               13.0|                40.0| 0.9978|3.51|     0.56|    9.4|      5|
|          5.6|           0.615|        0.0|           1.6|    0.089|               16.0|           

#Group By

Using PySpark

In [0]:
# Group DataFrame by 'quality' column, count number of rows in each group, then show the result
grouped_df = df.groupBy("quality").count()
grouped_df.show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  638|
|      3|   10|
|      5|  681|
|      4|   53|
|      8|   18|
|      7|  199|
+-------+-----+



Using SQL Query

In [0]:
# Execute SQL query to group data by 'quality' and count number of rows in each group, then show the result
grouped_df = spark.sql("SELECT quality, COUNT(*) as count FROM wine GROUP BY quality")
grouped_df.show()


+-------+-----+
|quality|count|
+-------+-----+
|      6|  638|
|      3|   10|
|      5|  681|
|      4|   53|
|      8|   18|
|      7|  199|
+-------+-----+



#Aggregate and Group By

Using PySpark

In [0]:
# Group DataFrame by 'quality' column, calculate average of 'pH' and 'residual sugar' in each group, then show the result
aggregated_df = df.groupBy("quality").agg(avg("pH").alias("avg_pH"), avg("residual sugar").alias("avg_residual_sugar"))
aggregated_df.show()


+-------+------------------+------------------+
|quality|            avg_pH|avg_residual_sugar|
+-------+------------------+------------------+
|      6|3.3180721003134837| 2.477194357366772|
|      3|3.3979999999999997|2.6350000000000002|
|      5|3.3049486049926546| 2.528854625550658|
|      4| 3.381509433962264|  2.69433962264151|
|      8|3.2672222222222214|2.5777777777777775|
|      7| 3.290753768844219|2.7206030150753793|
+-------+------------------+------------------+



Using SQL Query

In [0]:
# Execute SQL query to group data by 'quality' and calculate average of 'pH' and 'residual sugar' in each group, then show the result
aggregated_df = spark.sql("SELECT quality, AVG(pH) as avg_pH, AVG(`residual sugar`) as avg_residual_sugar FROM wine GROUP BY quality")
aggregated_df.show()


+-------+------------------+------------------+
|quality|            avg_pH|avg_residual_sugar|
+-------+------------------+------------------+
|      6|3.3180721003134837| 2.477194357366772|
|      3|3.3979999999999997|2.6350000000000002|
|      5|3.3049486049926546| 2.528854625550658|
|      4| 3.381509433962264|  2.69433962264151|
|      8|3.2672222222222214|2.5777777777777775|
|      7| 3.290753768844219|2.7206030150753793|
+-------+------------------+------------------+



#Joins

In [0]:
#Copying_df_into_df2
df2 = df

Using PySpark

In [0]:
# Join df and df2 on 'quality' column, then show the result
joined_df = df.join(df2, df.quality == df2.quality)
joined_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                3

Using SQL Query

In [0]:
# Create or replace a temporary view named 'wine2' from df2
df2.createOrReplaceTempView("wine2")
# Execute SQL query to join 'wine' and 'wine2' on 'quality' column, then show the result
joined_df = spark.sql("SELECT * FROM wine JOIN wine2 ON wine.quality = wine2.quality")
joined_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                3

#Summary

In [0]:
# Generate descriptive statistics summary of DataFrame
summary = df.describe()

In [0]:
summary.show()

+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|           chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|         sulphates|           alcohol|           quality|
+-------+------------------+-------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+------------------+------------------+
|  count|              1599|               1599|               1599|              1599|                1599|               1599|                1599|                1599|               1599|              1599|              1599|              1599|
|   mean