In [0]:
# Step 1: Read the CSV file using Spark
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/hexadatabrickswp_v2/default/jobdata/employees.csv")

# Step 2: Create a temporary view to query the data using SQL
df.createOrReplaceTempView("employees_view")

# Step 3: Query the data using SQL
display(spark.sql("SELECT * FROM employees_view"))

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
# Display the schema to verify column types
df.printSchema()

# Show sample data
display(df)

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
%sql
-- Step 1: Create a temporary view from the CSV file
CREATE OR REPLACE TEMPORARY VIEW employees_csv_view
USING CSV
OPTIONS (
  path "/Volumes/hexadatabrickswp_v2/default/jobdata/employees.csv", -- Use your actual path
  header "true",
  inferSchema "true"
);

-- Step 2: Now create a view from the temporary view
CREATE OR REPLACE TEMPORARY VIEW employees_view AS
SELECT 
    EMPLOYEE_ID, 
    FIRST_NAME, 
    LAST_NAME, 
    DEPARTMENT_ID
FROM 
    employees_csv_view  -- Use the name of your temporary view here
WHERE 
    DEPARTMENT_ID = '60';

-- Step 3: Query the new view
SELECT * FROM employees_view;

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,DEPARTMENT_ID
103,Alexander,Hunold,60
104,Bruce,Ernst,60
105,David,Austin,60
106,Valli,Pataballa,60
107,Diana,Lorentz,60


In [0]:
# Count total records
display(spark.sql("SELECT COUNT(*) AS total_employees FROM employees_view"))

# Show unique job titles
display(spark.sql("SELECT DISTINCT EMPLOYEE_ID FROM employees_view ORDER BY EMPLOYEE_ID"))



total_employees
5


EMPLOYEE_ID
103
104
105
106
107


In [0]:
# Step 1: Read the CSV file using Spark
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/hexadatabrickswp_v2/default/jobdata/employees.csv")

# Step 2: Create a temporary view to query the data using SQL
df.createOrReplaceTempView("employees_view")

# Step 3: Query the data using SQL
display(spark.sql("SELECT * FROM employees_view"))

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90


In [0]:
# 1. Create a department summary view
spark.sql("""
CREATE OR REPLACE TEMP VIEW department_stats AS
SELECT 
  DEPARTMENT_ID,
  COUNT(*) AS employee_count,
  ROUND(AVG(SALARY), 2) AS avg_salary,
  SUM(SALARY) AS total_salary
FROM employees_view
WHERE DEPARTMENT_ID IS NOT NULL
GROUP BY DEPARTMENT_ID
ORDER BY DEPARTMENT_ID
""")

# 2. Create an employee directory view
spark.sql("""
CREATE OR REPLACE TEMP VIEW employee_directory AS
SELECT 
  EMPLOYEE_ID,
  CONCAT(FIRST_NAME, ' ', LAST_NAME) AS full_name,
  EMAIL,
  PHONE_NUMBER,
  SALARY,
  DEPARTMENT_ID
FROM employees_view
ORDER BY LAST_NAME, FIRST_NAME
""")

DataFrame[]

In [0]:
# Show department statistics
display(spark.sql("SELECT * FROM department_stats"))

# Search for employees in a specific department
display(spark.sql("""
  SELECT * FROM employee_directory 
  WHERE DEPARTMENT_ID = 50
  ORDER BY SALARY DESC
"""))

# Find high earners
display(spark.sql("""
  SELECT * FROM employee_directory
  WHERE SALARY > 10000
  ORDER BY SALARY DESC
"""))

DEPARTMENT_ID,employee_count,avg_salary,total_salary
10,1,4400.0,4400
20,2,9500.0,19000
30,6,4150.0,24900
40,1,6500.0,6500
50,23,3721.74,85600
60,5,5760.0,28800
70,1,10000.0,10000
90,3,19333.33,58000
100,6,8601.33,51608
110,2,10154.0,20308


EMPLOYEE_ID,full_name,EMAIL,PHONE_NUMBER,SALARY,DEPARTMENT_ID
121,Adam Fripp,AFRIPP,650.123.2234,8200,50
120,Matthew Weiss,MWEISS,650.123.1234,8000,50
122,Payam Kaufling,PKAUFLIN,650.123.3234,7900,50
123,Shanta Vollman,SVOLLMAN,650.123.4234,6500,50
124,Kevin Mourgos,KMOURGOS,650.123.5234,5800,50
137,Renske Ladwig,RLADWIG,650.121.1234,3600,50
133,Jason Mallin,JMALLIN,650.127.1934,3300,50
129,Laura Bissot,LBISSOT,650.124.5234,3300,50
138,Stephen Stiles,SSTILES,650.121.2034,3200,50
125,Julia Nayer,JNAYER,650.124.1214,3200,50


EMPLOYEE_ID,full_name,EMAIL,PHONE_NUMBER,SALARY,DEPARTMENT_ID
100,Steven King,SKING,515.123.4567,24000,90
102,Lex De Haan,LDEHAAN,515.123.4569,17000,90
101,Neena Kochhar,NKOCHHAR,515.123.4568,17000,90
201,Michael Hartstein,MHARTSTE,515.123.5555,13000,20
205,Shelley Higgins,SHIGGINS,515.123.8080,12008,110
108,Nancy Greenberg,NGREENBE,515.124.4569,12008,100
114,Den Raphaely,DRAPHEAL,515.127.4561,11000,30


In [0]:
# Save as a Delta table for better performance
df.write.format("delta").mode("overwrite").saveAsTable("employees_delta")


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-7325561520002608>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Save as a Delta table for better performance[39;00m
[0;32m----> 2[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124memployees_delta[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'df' is not defined

In [0]:
%sql
CREATE OR REPLACE TABLE emp(name STRING, salary BIGINT);
SELECT *, typeof(SALARY)AS salary_type FROM employees_view;

EMPLOYEE_ID,FIRST_NAME,LAST_NAME,EMAIL,PHONE_NUMBER,HIRE_DATE,JOB_ID,SALARY,COMMISSION_PCT,MANAGER_ID,DEPARTMENT_ID,salary_type
198,Donald,OConnell,DOCONNEL,650.507.9833,21-JUN-07,SH_CLERK,2600,-,124,50,int
199,Douglas,Grant,DGRANT,650.507.9844,13-JAN-08,SH_CLERK,2600,-,124,50,int
200,Jennifer,Whalen,JWHALEN,515.123.4444,17-SEP-03,AD_ASST,4400,-,101,10,int
201,Michael,Hartstein,MHARTSTE,515.123.5555,17-FEB-04,MK_MAN,13000,-,100,20,int
202,Pat,Fay,PFAY,603.123.6666,17-AUG-05,MK_REP,6000,-,201,20,int
203,Susan,Mavris,SMAVRIS,515.123.7777,07-JUN-02,HR_REP,6500,-,101,40,int
204,Hermann,Baer,HBAER,515.123.8888,07-JUN-02,PR_REP,10000,-,101,70,int
205,Shelley,Higgins,SHIGGINS,515.123.8080,07-JUN-02,AC_MGR,12008,-,101,110,int
206,William,Gietz,WGIETZ,515.123.8181,07-JUN-02,AC_ACCOUNT,8300,-,205,110,int
100,Steven,King,SKING,515.123.4567,17-JUN-03,AD_PRES,24000,-,-,90,int


In [0]:
%sql
ALTER TABLE emp ADD COLUMN bonus3 SMALLINT;
SELECT * FROM emp;

name,salary,bonus,bonus2,bonus3


In [0]:
{"id":1,"firstname":"James ","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1,"gender":"M","salary":3000}

{'id': 1,
 'firstname': 'James ',
 'middlename': '',
 'lastname': 'Smith',
 'dob_year': 2018,
 'dob_month': 1,
 'gender': 'M',
 'salary': 3000}

In [0]:
{"id":2,"firstname":"Michael ","middlename":"Rose","lastname":"","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}

{'id': 2,
 'firstname': 'Michael ',
 'middlename': 'Rose',
 'lastname': '',
 'dob_year': 2010,
 'dob_month': 3,
 'gender': 'M',
 'salary': 4000}

In [0]:
{"id":3,"firstname":"Robert ","middlename":"","lastname":"Williams","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}

{'id': 3,
 'firstname': 'Robert ',
 'middlename': '',
 'lastname': 'Williams',
 'dob_year': 2010,
 'dob_month': 3,
 'gender': 'M',
 'salary': 4000}

In [0]:
{"id":4,"firstname":"Maria ","middlename":"Anne","lastname":"Jones","dob_year":2005,"dob_month":5,"gender":"F","salary":4000}

{'id': 4,
 'firstname': 'Maria ',
 'middlename': 'Anne',
 'lastname': 'Jones',
 'dob_year': 2005,
 'dob_month': 5,
 'gender': 'F',
 'salary': 4000}

In [0]:
{"id":5,"firstname":"Jen","middlename":"Mary","lastname":"Brown","dob_year":2010,"dob_month":7,"gender":"","salary":-1}

{'id': 5,
 'firstname': 'Jen',
 'middlename': 'Mary',
 'lastname': 'Brown',
 'dob_year': 2010,
 'dob_month': 7,
 'gender': '',
 'salary': -1}

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType

# 1. Initialize Spark session
spark = SparkSession.builder.appName("local").getOrCreate()

# 2. Define schema *before* using it
schema = StructType().add("id", IntegerType())\
                     .add("firstname", StringType())\
                     .add("middlename", StringType())\
                     .add("lastname", StringType())\
                     .add("dob_year", IntegerType())\
                     .add("dob_month", IntegerType())\
                     .add("gender", StringType())\
                     .add("salary", IntegerType())

# 3. Read streaming data
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "josn_data_topic") \
    .load()

# 4. Print the raw Kafka schema
df.printSchema()

# 5. Parse the JSON data
personStringDF = df.selectExpr("CAST(value AS STRING)")
personDF = personStringDF.withColumn("value", from_json(col("value"), schema)).select("value.*")

# 6. Start the streaming query
personDF.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .start() \
    .awaitTermination()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



25/08/12 09:06:31 Spark Server has not sent updates for Streaming Query 026f048b-9786-4a46-a921-edbf52bf378c in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 9f978626-f40c-4791-b2e1-18720e5a059d. This is typically not a problem.
25/08/12 09:06:35 Spark Server has not sent updates for Streaming Query 026f048b-9786-4a46-a921-edbf52bf378c in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 9f978626-f40c-4791-b2e1-18720e5a059d. This is typically not a problem.
25/08/12 09:06:36 Spark Server has not sent updates for Streaming Query 026f048b-9786-4a46-a921-edbf52bf378c in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is 9f978626-f40c-4791-b2e1-18720e5a059d. This is typically not a problem.
25/08/12 09:06:40 Spark Server has not sent updates for Streaming Query 026f048b-9786-4a46-a921-edbf52bf378c in60 seconds, but the query is still active. Marking query as in-p

[0;31m---------------------------------------------------------------------------[0m
[0;31mStreamingQueryException[0m                   Traceback (most recent call last)
File [0;32m<command-8350778221659051>, line 38[0m
[1;32m     30[0m personDF [38;5;241m=[39m personStringDF[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mvalue[39m[38;5;124m"[39m, from_json(col([38;5;124m"[39m[38;5;124mvalue[39m[38;5;124m"[39m), schema))[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mvalue.*[39m[38;5;124m"[39m)
[1;32m     32[0m [38;5;66;03m# 6. Start the streaming query[39;00m
[1;32m     33[0m personDF[38;5;241m.[39mwriteStream \
[1;32m     34[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mconsole[39m[38;5;124m"[39m) \
[1;32m     35[0m     [38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m) \
[1;32m     36[0m     [38;5;241m.[39mtrigger(availableNow[38;5;241m=[39m[38;5;28;01mTrue[39;00m) \
[1;32m     