In [None]:
pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Initialize a SparkSession
spark = SparkSession.builder.appName("CRUD Operations").getOrCreate()

# Example data to simulate a table
data = [
    (1, "nishit", 21),
    (2, "jay", 23),
    (3, "Deep", 22),
    (4, "Danish", 25)
]

# Create schema
columns = ["id", "name", "age"]

# Create a DataFrame (This simulates the 'Create' operation for the table)
df = spark.createDataFrame(data, columns)

# 1. CREATE - Show the initial table
print("Initial Table:")
df.show()

# 2. READ - Read from the table (Select specific columns)
print("Read Operation (Select 'name' and 'age'):")
df.select("name", "age").show()

# 3. UPDATE - Let's update the 'age' of a person with 'id' = 2 (Bob)
print("Update jay age to 32:")
df_update = df.withColumn("age", when(col("id") == 2, 28).otherwise(col("age")))
df_update.show()

# 4. DELETE - Let's delete the row where the name is 'Danny'
print("Delete operation (Remove Danish):")
df_delete = df_update.filter(col("name") != "Danish")
df_delete.show()




Initial Table:
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|nishit| 21|
|  2|   jay| 23|
|  3|  Deep| 22|
|  4|Danish| 25|
+---+------+---+

Read Operation (Select 'name' and 'age'):
+------+---+
|  name|age|
+------+---+
|nishit| 21|
|   jay| 23|
|  Deep| 22|
|Danish| 25|
+------+---+

Update jay age to 32:
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|nishit| 21|
|  2|   jay| 28|
|  3|  Deep| 22|
|  4|Danish| 25|
+---+------+---+

Delete operation (Remove Danish):
+---+------+---+
| id|  name|age|
+---+------+---+
|  1|nishit| 21|
|  2|   jay| 28|
|  3|  Deep| 22|
+---+------+---+



In [5]:
# Example 1: Basic SELECT statement
df.select("name", "age").show()

# Example 2: WHERE clause with a condition
df.filter(df["age"] > 22).show()


# Example 3: Using multiple conditions
df.filter((df["age"] > 20) & (df["name"].like("n%"))).show()

# Example 4: ORDER BY clause
df.orderBy("age", ascending=False).show()

# Example 5: GROUP BY and aggregate functions
df.groupBy("age").count().show()


# Example 6: Using when() and otherwise() for conditional updates (similar to the previous update operation)
df_updated = df.withColumn("age_group",
                           when(col("age") < 23, "Young")
                           .when(col("age") < 25, "Mid")
                           .otherwise("Old"))

df_updated.show()
# Stopping the Spark session
spark.stop()

+------+---+
|  name|age|
+------+---+
|nishit| 21|
|   jay| 23|
|  Deep| 22|
|Danish| 25|
+------+---+

+---+------+---+
| id|  name|age|
+---+------+---+
|  2|   jay| 23|
|  4|Danish| 25|
+---+------+---+

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|nishit| 21|
+---+------+---+

+---+------+---+
| id|  name|age|
+---+------+---+
|  4|Danish| 25|
|  2|   jay| 23|
|  3|  Deep| 22|
|  1|nishit| 21|
+---+------+---+

+---+-----+
|age|count|
+---+-----+
| 21|    1|
| 23|    1|
| 22|    1|
| 25|    1|
+---+-----+

+---+------+---+---------+
| id|  name|age|age_group|
+---+------+---+---------+
|  1|nishit| 21|    Young|
|  2|   jay| 23|      Mid|
|  3|  Deep| 22|    Young|
|  4|Danish| 25|      Old|
+---+------+---+---------+



In [1]:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","Smith","IND","MH"),("Michael","Rose","IND","MP"), \
    ("Robert","Williams","IND","UP"),("Maria","Jones","IND","TN") \
  ]
columns=["firstname","lastname","country","state"]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
print(df.collect())


states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))
print(res)



#Example 2
states2=df.rdd.map(lambda x: x.state).collect()
print(states2)

states3=df.select(df.state).collect()
print(states3)

states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)

states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)

pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    IND|   MH|
|  Michael|    Rose|    IND|   MP|
|   Robert|Williams|    IND|   UP|
|    Maria|   Jones|    IND|   TN|
+---------+--------+-------+-----+

[Row(firstname='James', lastname='Smith', country='IND', state='MH'), Row(firstname='Michael', lastname='Rose', country='IND', state='MP'), Row(firstname='Robert', lastname='Williams', country='IND', state='UP'), Row(firstname='Maria', lastname='Jones', country='IND', state='TN')]
['MH', 'MP', 'UP', 'TN']
['MH', 'MP', 'UP', 'TN']
['MH', 'MP', 'UP', 'TN']
[Row(state='MH'), Row(state='MP'), Row(state='UP'), Row(state='TN')]
['MH', 'MP', 'UP', 'TN']
['MH', 'MP', 'UP', 'TN']
['MH', 'MP', 'UP', 'TN']
['James', 'Michael', 'Robert', 'Maria']
