# DataFrame from Various sources

In [1]:
import os
os.environ['SPARK_HOME'] = r"C:\Users\Dani\Documents\Python Scripts\Spark"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ["HADOOP_HOME"] = r"C:\hadoop\hadoop-3.2.2"
os.environ["PATH"] += r";C:\hadoop\hadoop-3.2.2\bin"

In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, mean, median, stddev, isnan, isnull

In [3]:
spark = SparkSession.builder.appName("DFVarious").getOrCreate()

## Read CSV

In [4]:
csv_file_path = "data/products.csv"
df = spark.read.csv(csv_file_path, inferSchema=True, header=True, sep=",")

In [5]:
df.show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
| 11|        MAC Lipstick|         Beauty|      75| 16.99|
| 12|Adidas Running Shoes|         Sports|      22| 59.99|
| 13|       PlayStation 5|    Electronics|      12|499.99|
| 14|   Hooded Sweatshirt|       Clothing|      10| 34.9

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



### Read with explicit schema

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [8]:
# Define the schema
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="name", dataType=StringType(), nullable=True),
    StructField(name="category", dataType=StringType(), nullable=True),
    StructField(name="quantity", dataType=IntegerType(), nullable=True),
    StructField(name="price", dataType=DoubleType(), nullable=True)
])

In [9]:
df = spark.read.csv(csv_file_path, schema=schema, header=True)

In [10]:
df.printSchema()

df.show(5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



## Read JSON file

In [11]:
json_file_path = "data/products_singleline.json"
df = spark.read.json(json_file_path, multiLine=False)

In [12]:
df.printSchema()

df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



In [13]:
json_file_path_multi = "data/products_multiline.json"
df = spark.read.json(json_file_path_multi, multiLine=True)

In [14]:
df.printSchema()
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



## Read Parquet file

In [17]:
parquet_file_path = "data/house-price.parquet"
df = spark.read.parquet(parquet_file_path)

In [19]:
df.printSchema()
df.show(5)

root
 |-- price: long (nullable = true)
 |-- area: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- bathrooms: long (nullable = true)
 |-- stories: long (nullable = true)
 |-- mainroad: string (nullable = true)
 |-- guestroom: string (nullable = true)
 |-- basement: string (nullable = true)
 |-- hotwaterheating: string (nullable = true)
 |-- airconditioning: string (nullable = true)
 |-- parking: long (nullable = true)
 |-- prefarea: string (nullable = true)
 |-- furnishingstatus: string (nullable = true)

+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|   price|area|bedrooms|bathrooms|stories|mainroad|guestroom|basement|hotwaterheating|airconditioning|parking|prefarea|furnishingstatus|
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+
|13300000|7420|       4|        2|      3|     yes|       no

In [26]:
df.createOrReplaceTempView("housing")

In [33]:
len(df.collect())

545

In [35]:
res = spark.sql("SELECT area, AVG(price) AS avg_price, AVG(bedrooms) AS avg_bedrooms, AVG(bathrooms) AS avg_bathrooms FROM housing GROUP BY ALL")
res.show()

+-----+------------------+------------------+------------------+
| area|         avg_price|      avg_bedrooms|     avg_bathrooms|
+-----+------------------+------------------+------------------+
| 1950|         2835000.0|               3.0|               1.5|
| 3800|         4147500.0|               2.5|               1.0|
| 5850|         3570000.0|               2.5|               1.0|
| 2520|         3773000.0|               5.0|               2.0|
| 3120|3474333.3333333335|               3.0|               1.0|
| 7424|         3500000.0|               3.0|               1.0|
| 3069|         3150000.0|               2.0|               1.0|
| 7800|         7280000.0|               3.0|               1.5|
| 3640|         3542000.0|2.5714285714285716|1.1428571428571428|
| 3680|         4095000.0|               3.0|               2.0|
| 4300|         6769000.0|               4.5|               2.0|
| 2175|         4270000.0|               3.0|               1.0|
| 2400|         1933575.0

In [46]:
grouped_data = df.groupBy("area").agg({"price": "avg", "bedrooms": "avg", "bathrooms": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()

Grouped and Aggregated Data:
+-----+------------------+------------------+------------------+
| area|     avg(bedrooms)|        avg(price)|    avg(bathrooms)|
+-----+------------------+------------------+------------------+
| 1950|               3.0|         2835000.0|               1.5|
| 3800|               2.5|         4147500.0|               1.0|
| 5850|               2.5|         3570000.0|               1.0|
| 2520|               5.0|         3773000.0|               2.0|
| 3120|               3.0|3474333.3333333335|               1.0|
| 7424|               3.0|         3500000.0|               1.0|
| 3069|               2.0|         3150000.0|               1.0|
| 7800|               3.0|         7280000.0|               1.5|
| 3640|2.5714285714285716|         3542000.0|1.1428571428571428|
| 3680|               3.0|         4095000.0|               2.0|
| 4300|               4.5|         6769000.0|               2.0|
| 2175|               3.0|         4270000.0|               1

In [56]:
res = df.groupBy("area").agg(countDistinct("furnishingstatus").alias("Unique Furnishing Statuses"), 
                             mean("bedrooms").alias("Mean Bedrooms"),
                             median("price").alias("Mean Price"),
                             stddev("price").alias("Stddev Price")).filter((~isnan("Stddev Price")) & (~isnull("Stddev Price")))
res.show()

+----+--------------------------+------------------+----------+------------------+
|area|Unique Furnishing Statuses|     Mean Bedrooms|Mean Price|      Stddev Price|
+----+--------------------------+------------------+----------+------------------+
|1950|                         1|               3.0| 2835000.0| 841457.0696119915|
|2145|                         2|3.1666666666666665| 3447500.0|368027.39934231347|
|2400|                         2|               3.0| 1933575.0|235360.49211794234|
|2610|                         2|               3.5| 3815000.0| 544472.2215136416|
|2700|                         1|               2.5| 3146500.0| 292035.1006300441|
|2787|                         1|               3.5| 3307500.0|1311683.0791010456|
|2800|                         2|               3.0| 3808000.0| 1623517.169604313|
|2880|                         2|               3.0| 3461500.0| 1331482.068974269|
|2910|                         2|               2.5| 2177000.0| 603869.1911333116|
|300