In [23]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#Initilize Sparksession
spark = SparkSession.builder.getOrCreate()
sc=spark.sparkContext
test_df = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

In [24]:
ud_scehma = ["employee_name","department","state","salary","age","bonus"]

df = spark.createDataFrame(data=test_df,schema = ud_scehma)

In [25]:
df.cache().count()

9

In [26]:
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [27]:
from pyspark import StorageLevel

In [28]:
test = StorageLevel(useDisk=False, useMemory=True,useOffHeap=False, deserialized=False, replication=2)

In [29]:
df.persist(storageLevel=test)

23/09/25 04:45:36 WARN CacheManager: Asked to cache already cached data.


DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [30]:
df.unpersist()

DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [32]:
df.groupBy("state").max("salary").alias("max_sal_by_state").show()

[Stage 10:>                                                         (0 + 2) / 2]

+-----+-----------+
|state|max(salary)|
+-----+-----------+
|   CA|      99000|
|   NY|      91000|
+-----+-----------+



                                                                                

In [33]:
df.groupBy("department").max("salary").alias("max_sal_by_state").show()

+----------+-----------+
|department|max(salary)|
+----------+-----------+
|     Sales|      90000|
|   Finance|      99000|
| Marketing|      91000|
+----------+-----------+



In [39]:
df.groupBy("department","state").agg(sum("salary").alias("sum_salary"),
                            avg("salary").alias("avg_salary")) \
.where(col("avg_salary")>80000).show()

+----------+-----+----------+----------+
|department|state|sum_salary|avg_salary|
+----------+-----+----------+----------+
|     Sales|   CA|     81000|   81000.0|
|   Finance|   CA|    189000|   94500.0|
|     Sales|   NY|    176000|   88000.0|
|   Finance|   NY|    162000|   81000.0|
| Marketing|   NY|     91000|   91000.0|
+----------+-----+----------+----------+



In [37]:
df.createOrReplaceTempView("ida")

In [40]:
spark.sql("select * from ida")

DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [41]:
df.write.saveAsTable("emp_tbl")

                                                                                

In [42]:
test = spark.sql("DESCRIBE emp_tbl")

In [43]:
test.show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|employee_name|   string|   null|
|   department|   string|   null|
|        state|   string|   null|
|       salary|   bigint|   null|
|          age|   bigint|   null|
|        bonus|   bigint|   null|
+-------------+---------+-------+



In [44]:
test01=spark.sql("DESCRIBE EXTENDED emp_tbl")
test01.show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       employee_name|              string|   null|
|          department|              string|   null|
|               state|              string|   null|
|              salary|              bigint|   null|
|                 age|              bigint|   null|
|               bonus|              bigint|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|             default|       |
|               Table|             emp_tbl|       |
|        Created Time|Mon Sep 25 05:14:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

In [46]:
spark.sql("CREATE DATABASE idashell")

DataFrame[]

In [47]:
spark.sql("use idashell")

DataFrame[]

In [48]:
df.write.saveAsTable("test_tbl")

[Stage 26:>                                                         (0 + 2) / 2]                                                                                

In [49]:
test01=spark.sql("DESCRIBE EXTENDED test_tbl")
test01.show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       employee_name|              string|   null|
|          department|              string|   null|
|               state|              string|   null|
|              salary|              bigint|   null|
|                 age|              bigint|   null|
|               bonus|              bigint|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|            idashell|       |
|               Table|            test_tbl|       |
|        Created Time|Mon Sep 25 05:22:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

In [51]:
df.write.option("path", "/home/labuser/Documents/my_tbl_data").saveAsTable("e_emptbl")

In [52]:
test01=spark.sql("DESCRIBE EXTENDED e_emptbl")
test01.show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       employee_name|              string|   null|
|          department|              string|   null|
|               state|              string|   null|
|              salary|              bigint|   null|
|                 age|              bigint|   null|
|               bonus|              bigint|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|            idashell|       |
|               Table|            e_emptbl|       |
|        Created Time|Mon Sep 25 05:34:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.4.1|       |
|                Type|            EXTERNAL|       |
|            Provider|             parquet|       |
|           

In [53]:
spark.sql("drop table default.emp_tbl")

DataFrame[]

In [54]:
spark.sql("drop table idashell.e_emptbl")

DataFrame[]

In [55]:
df.rdd.getNumPartitions()

2

In [56]:
df.write.csv("/home/labuser/documents/testcontainer")

[Stage 28:>                                                         (0 + 2) / 2]                                                                                

In [60]:
df.write.partitionBy("state","department").csv("/home/labuser/documents/testcontainer_02")

[Stage 30:>                                                         (0 + 2) / 2]                                                                                

In [61]:
def func(df):
    print(df.state)

In [62]:
df.foreach(func)

CA
NY
NY
CA
NY
NY
NY
CA
CA


In [65]:
json_data = [
    '{"name": "Alice", "age": 25}',
    '{"name": "Bob", "age": 30}',
    '{"name": "Charlie", "age": 35}'
]

In [69]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([
    StructField("name". StringType(), True),
    StructField("age". IntegerType(), True)
])

AttributeError: 'str' object has no attribute 'StringType'

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36608)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/anaconda3/lib/python3.11/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/opt/anaconda3/lib/python3.11/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
       ^^^^^^
  File "/opt/anaconda3/lib/python3.11/site-packages/pyspark/accumulators.py", line 257

In [67]:
df = spark.read.schema(schema).json(spark.sparkContext.parallelize(json_data))

NameError: name 'schema' is not defined

In [None]:
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True)
    ]), True)
])


df = spark.read.schema(schema).json(spark.sparkContext.parallelize(json_data))