In [16]:
import sys
import os



# Add PySpark and Python paths manually (adjust paths as needed)
spark_home = os.environ.get('SPARK_HOME', None)  # Optional: Set if using standalone Spark
python_path = os.path.join(spark_home, 'python') if spark_home else ''
sys.path.insert(0, python_path)
sys.path.insert(0, os.path.join(python_path, 'lib', 'py4j-0.10.9.7-src.zip'))  # Match your Py4J version

In [17]:
os.environ['SPARK_HOME'] = "/Applications/spark-3.5.5-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("CREATE-DATAFRAME").getOrCreate()


In [19]:
%%bash

head -10 ./data_csv/power_consumption.csv

DateTime,Temperature,Humidity,Wind Speed,general diffuse flows,diffuse flows,Zone 1,Zone 2  ,Zone 3  
01-01-2017 00:00,6.559,73.8,0.083,0.051,0.119,34055.6962,16128.87538,20240.96386
01-01-2017 00:10,6.414,74.5,0.083,0.07,0.085,29814.68354,19375.07599,20131.08434
01-01-2017 00:20,6.313,74.5,0.08,0.062,0.1,29128.10127,19006.68693,19668.43373
01-01-2017 00:30,6.121,75,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711
01-01-2017 00:40,5.921,75.7,0.081,0.048,0.085,27335.6962,17872.34043,18442.40964
01-01-2017 00:50,5.853,76.9,0.081,0.059,0.108,26624.81013,17416.41337,18130.12048
01-01-2017 01:00,5.641,77.7,0.08,0.048,0.096,25998.98734,16993.31307,17945.06024
01-01-2017 01:10,5.496,78.2,0.085,0.055,0.093,25446.07595,16661.39818,17459.27711
01-01-2017 01:20,5.678,78.1,0.081,0.066,0.141,24777.72152,16227.35562,17025.54217


In [20]:
# Read CSV file into DataFrame
csv_file_path = "./data_files/power_consumption.csv"
df = spark.read.csv(csv_file_path, header= True)

In [21]:
# Display the schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- DateTime: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- Wind Speed: string (nullable = true)
 |-- general diffuse flows: string (nullable = true)
 |-- diffuse flows: string (nullable = true)
 |-- Zone 1: string (nullable = true)
 |-- Zone 2  : string (nullable = true)
 |-- Zone 3  : string (nullable = true)

+----------------+-----------+--------+----------+---------------------+-------------+-----------+-----------+-----------+
|        DateTime|Temperature|Humidity|Wind Speed|general diffuse flows|diffuse flows|     Zone 1|   Zone 2  |   Zone 3  |
+----------------+-----------+--------+----------+---------------------+-------------+-----------+-----------+-----------+
|01-01-2017 00:00|      6.559|    73.8|     0.083|                0.051|        0.119| 34055.6962|16128.87538|20240.96386|
|01-01-2017 00:10|      6.414|    74.5|     0.083|                 0.07|        0.085|29814.68354|19375.07599|20131.08434

# Read CSV with an explicit schema definition

In [26]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, DateType

In [30]:
# Define the schema

schema = StructType([
    StructField(name= "DateTime", dataType= StringType(), nullable= True),
    StructField(name= "Temperature", dataType= DoubleType(), nullable = True), 
    StructField(name= "Humidity", dataType= DoubleType(), nullable = True), 
    StructField(name= "Wind Speed", dataType= DoubleType(), nullable = True), 
    StructField(name= "general diffuse", dataType= DoubleType(), nullable = True), 
    StructField(name= "diffuse flows", dataType= DoubleType(), nullable = True), 
    StructField(name= "Zone 1", dataType= DoubleType(), nullable = True), 
    StructField(name= "Zone 2", dataType= DoubleType(), nullable = True), 
    StructField(name= "Zone 3", dataType= DoubleType(), nullable = True)
])
    

In [31]:
# reading from csv file using schema
df = spark.read.csv(csv_file_path, header= True, schema= schema )

In [32]:
df.printSchema()

df.show(3)

root
 |-- DateTime: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind Speed: double (nullable = true)
 |-- general diffuse: double (nullable = true)
 |-- diffuse flows: double (nullable = true)
 |-- Zone 1: double (nullable = true)
 |-- Zone 2: double (nullable = true)
 |-- Zone 3: double (nullable = true)

+----------------+-----------+--------+----------+---------------+-------------+-----------+-----------+-----------+
|        DateTime|Temperature|Humidity|Wind Speed|general diffuse|diffuse flows|     Zone 1|     Zone 2|     Zone 3|
+----------------+-----------+--------+----------+---------------+-------------+-----------+-----------+-----------+
|01-01-2017 00:00|      6.559|    73.8|     0.083|          0.051|        0.119| 34055.6962|16128.87538|20240.96386|
|01-01-2017 00:10|      6.414|    74.5|     0.083|           0.07|        0.085|29814.68354|19375.07599|20131.08434|
|01-01-2017 00:20|      6.313|    74.5

25/04/15 11:24:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DateTime, Temperature, Humidity, Wind Speed, general diffuse flows, diffuse flows, Zone 1, Zone 2  , Zone 3  
 Schema: DateTime, Temperature, Humidity, Wind Speed, general diffuse, diffuse flows, Zone 1, Zone 2, Zone 3
Expected: general diffuse but found: general diffuse flows
CSV file: file:///Users/apple/projects/data_projects/data_ing/PySpark-tutoriel/data_csv/power_consumption.csv


# Read csv with inferSchema

In [33]:
# spark guess the schema automatically when using inferSchema

# reading from csv file using schema
csv_file_path = "./data_files/power_consumption.csv"
df = spark.read.csv(csv_file_path, header= True, inferSchema= True)

df.printSchema()

df.show(3)

# Read JSON file into DataFrame

In [37]:
%%bash

head -10 ./data_files/people.json

{"name":"Michael","age":45}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"Driss","age":25}
{"name":"Ali", "age":20}
{"name":"Ahmed", "age":22}
{"name":"Amine","age":17}
{"name":"Azzedine", "age":27}
{"name":"Aziz", "age":55}
{"name":"Mohcine","age":18}


In [39]:
json_file_path = "./data_files/people.json"

df =spark.read.json(json_file_path)

In [40]:
df.printSchema()

df.show(5)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+---+-------+
|age|   name|
+---+-------+
| 45|Michael|
| 30|   Andy|
| 19| Justin|
| 25|  Driss|
| 20|    Ali|
+---+-------+
only showing top 5 rows



# Write dataframe into parquet file 

In [41]:
parquet_file_path = "./data_files/people.parquet"

In [42]:
df.write.parquet(parquet_file_path)

                                                                                

# Read parquet file into dataframe

In [43]:
df = spark.read.parquet(parquet_file_path)

In [45]:
df.printSchema

df.show(7)

+---+-------+
|age|   name|
+---+-------+
| 45|Michael|
| 30|   Andy|
| 19| Justin|
| 25|  Driss|
| 20|    Ali|
| 22|  Ahmed|
| 17|  Amine|
+---+-------+
only showing top 7 rows



In [46]:
spark.stop()