<a href="https://colab.research.google.com/github/Yashwanth-1406/Pyspark-yashwanth/blob/main/Rdd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark import SparkContext

sc = SparkContext("local", "EvenOddExample")

# Create an RDD with numbers from 1 to 10
numbers_rdd = sc.parallelize(range(1, 11))

# Filter even numbers
even_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)

# Filter odd numbers
odd_rdd = numbers_rdd.filter(lambda x: x % 2 != 0)

# Collect and print one even and one odd number
print("One Even Number:", even_rdd.take(1))  # Example: [2]
print("One Odd Number:", odd_rdd.take(1))    # Example: [1]


One Even Number: [2]
One Odd Number: [1]


In [2]:
data = [
    (1, "John", "HR", 5000),
    (2, "Jane", "IT", 8000),
    (3, "Mike", "IT", 6000),
    (4, "Sara", "Finance", 7000),
    (5, "David", "HR", 5500)
]

from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameOperations").getOrCreate()

# Define column names
columns = ["ID", "Name", "Department", "Salary"]

# Create a DataFrame from the sample data
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

+---+-----+----------+------+
| ID| Name|Department|Salary|
+---+-----+----------+------+
|  1| John|        HR|  5000|
|  2| Jane|        IT|  8000|
|  3| Mike|        IT|  6000|
|  4| Sara|   Finance|  7000|
|  5|David|        HR|  5500|
+---+-----+----------+------+



In [4]:
df.select("Name","Salary").show()

+-----+------+
| Name|Salary|
+-----+------+
| John|  5000|
| Jane|  8000|
| Mike|  6000|
| Sara|  7000|
|David|  5500|
+-----+------+



In [8]:
from pyspark.sql.functions import col

# Add new column: Salary_After_Tax = Salary * 0.8 (i.e., 20% tax deducted)
df_with_tax = df.withColumn("Salary_After_Tax", col("Salary") * 0.8)

# Now sort by Salary_After_Tax descending
df_with_tax.sort(col("Salary_After_Tax").desc()).show()


+---+-----+----------+------+----------------+
| ID| Name|Department|Salary|Salary_After_Tax|
+---+-----+----------+------+----------------+
|  2| Jane|        IT|  8000|          6400.0|
|  4| Sara|   Finance|  7000|          5600.0|
|  3| Mike|        IT|  6000|          4800.0|
|  5|David|        HR|  5500|          4400.0|
|  1| John|        HR|  5000|          4000.0|
+---+-----+----------+------+----------------+



In [10]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrameOperations").getOrCreate()

# Get SparkContext from SparkSession
sc = spark.sparkContext

# Create RDD of Row objects
rdd = sc.parallelize([Row(name="Alice", age=25), Row(name="Bob", age=30)])

# Convert RDD to DataFrame
df = spark.createDataFrame(rdd)

# Filter rows where age > 25
filtered_df = df.filter(df.age > 25)

# Select 'name' column from filtered DataFrame
filtered_df.select("name").show()


+----+
|name|
+----+
| Bob|
+----+



In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQLBasics").getOrCreate()

data = [
    (1, "Alice", "Sales", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500),
    (4, "David", "Sales", 4500),
    (5, "Eva", "IT", 4200)
]
columns = ["EmpID", "Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)
df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1|Alice|     Sales|  3000|
|    2|  Bob|        IT|  4000|
|    3|Cathy|        HR|  3500|
|    4|David|     Sales|  4500|
|    5|  Eva|        IT|  4200|
+-----+-----+----------+------+



In [12]:
data = [
    (1, "Alice", "Sales", 3000),
    (2, "Bob", "IT", 4000),
    (3, "Cathy", "HR", 3500)
]
columns = ["EmpID", "Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
df.show()

+-----+-----+----------+------+
|EmpID| Name|Department|Salary|
+-----+-----+----------+------+
|    1|Alice|     Sales|  3000|
|    2|  Bob|        IT|  4000|
|    3|Cathy|        HR|  3500|
+-----+-----+----------+------+



In [2]:
import random
import csv

# Generate 30 records with random data
names = ["John", "Jane", "Mike", "Sara", "David", "Emily", "George", "Nina", "Tom", "Anna"]
departments = ["Sales", "IT", "HR", "Finance", "Marketing"]
salaries = [3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

# Create and open a CSV file for writing
with open('employee_data.csv', mode='w', newline='') as file:
    writer = csv.writer(file)

    # Write the header
    writer.writerow(["ID", "Name", "Department", "Salary"])

    # Write the 30 records
    for i in range(1, 31):
        name = random.choice(names)
        department = random.choice(departments)
        salary = random.choice(salaries)
        writer.writerow([i, name, department, salary])

print("CSV file 'employee_data.csv' has been generated successfully.")


CSV file 'employee_data.csv' has been generated successfully.


In [3]:
from pyspark.sql.types import StringType,StructType,IntegerType

schema = StructType() \
      .add("EmpID", IntegerType()) \
      .add("Name", StringType()) \
      .add("Department",StringType())\
      .add("Salary", IntegerType())
print(schema)

StructType([StructField('EmpID', IntegerType(), True), StructField('Name', StringType(), True), StructField('Department', StringType(), True), StructField('Salary', IntegerType(), True)])


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkSQLBasics").getOrCreate()
stream_df=spark.readStream \
   .option("sep",",") \
   .schema(schema) \
   .csv("/content/empdata/")

stream_df.isStreaming

True

In [12]:
from pyspark.sql.functions import upper
transformed_df = stream_df.withColumn("NameUPPER",upper("Name"))

In [15]:
query = transformed_df.writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

In [9]:
%%bash
cat<<EOF > /content/empdata.employee_data2.csv
1,John,Sales,3000
2,Jane,IT,4000
3,Mike,Sales,5000
4,Sara,Finance,6000
5,David,HR,7000
6,Emily,Marketing,6000
7,George,HR,4000
8,Nina,Sales,5000
9,Tom,IT,8000
10,Anna,Marketing,3000
EOF

In [14]:
query = transformed_df.writeStream \
    .outputMode("append") \
    .format("json") \
    .option("path", "/tmp/stream_json_output") \
    .option("checkpointLocation", "/tmp/stream_checkpoint") \
    .start()

In [16]:
df = spark.read.json("/tmp/stream_json_output")
df.show()

+----------+-----+------+---------+------+
|Department|EmpID|  Name|NameUPPER|Salary|
+----------+-----+------+---------+------+
|Department| NULL|  Name|     NAME|  NULL|
| Marketing|    1|   Tom|      TOM| 10000|
|     Sales|    2|George|   GEORGE| 10000|
| Marketing|    3|  Jane|     JANE| 10000|
|        HR|    4|  John|     JOHN|  4000|
| Marketing|    5|  Nina|     NINA|  9000|
|        HR|    6| Emily|    EMILY|  4000|
| Marketing|    7| Emily|    EMILY|  3000|
|     Sales|    8|  Jane|     JANE|  7000|
| Marketing|    9|  Anna|     ANNA|  6000|
|        HR|   10|George|   GEORGE|  4000|
| Marketing|   11| Emily|    EMILY|  6000|
|        IT|   12|  Anna|     ANNA|  8000|
| Marketing|   13|  Sara|     SARA|  3000|
|   Finance|   14|  Anna|     ANNA|  3000|
| Marketing|   15|  Jane|     JANE|  3000|
|     Sales|   16|George|   GEORGE|  3000|
|        HR|   17|  Nina|     NINA|  4000|
|        HR|   18|George|   GEORGE|  9000|
|     Sales|   19|  John|     JOHN|  6000|
+----------

In [17]:
pip install faker

Collecting faker
  Downloading faker-37.4.2-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.4.2-py3-none-any.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.9 MB[0m [31m9.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.9/1.9 MB[0m [31m31.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m22.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.4.2


In [None]:
from faker import Faker
import random
import time
import uuid

# Create Faker instance
fake = Faker()

# Output file
output_file = "/content/empdata/random_data.csv"

while True:
    emp_id = random.randint(1000, 9999)      # Random Employee ID
    name = fake.name()                       # Random Name
    department = fake.job()                  # Random Job Title as Department
    salary = random.randint(3000, 12000)     # Random Salary
    unique_id = uuid.uuid4()                 # Unique ID for no repetition

    record = f"{emp_id},{name},{department},{salary},{unique_id}\n"

    # Append to file
    with open(output_file, "a") as f:
        f.write(record)

    print("Generated:", record.strip())

    time.sleep(1)  # Wait 1 second before generating next line

Generated: 9056,Michael Richmond,Farm manager,7089,d02da143-bffd-4ce1-801d-84ee058d1386
Generated: 3826,Dr. Phillip Davis,Public relations officer,7174,8b4a1101-d2fc-4676-9e90-faeb091f1214
Generated: 3580,Raymond Smith,Social worker,9100,600b220f-52aa-4104-ad0f-9c593adf3d62
Generated: 2569,Andrew Butler,Airline pilot,4565,aff7eca1-23c7-4cef-a0f3-ed7b7a1a6584
Generated: 3485,David Ford,Medical secretary,7772,068a02a0-d26e-4ebd-b237-2c82698f690d
Generated: 6537,Christopher Garrison,Chartered public finance accountant,4137,a7eb83b3-ecfd-4805-97d1-a7b4dc7ab00f
Generated: 1554,Justin Russell,Engineer, electrical,8344,6daab10d-25a6-4f96-b374-bdc10c52a17a
Generated: 9201,Daniel Stephenson,Cytogeneticist,7717,d140da8f-564e-4825-9812-1321cdba6ef2
Generated: 7398,Nicole Boyd,Mudlogger,10004,7555a475-fcf9-4046-b1fe-bde259cfd246
Generated: 3046,Debra Harrison,Meteorologist,5314,8e8be89a-b6dc-4b86-a2d6-5ff7928c75ab
Generated: 5257,Alexander Gomez,Presenter, broadcasting,8756,660406b8-0399-4b08-907b

In [6]:
import os
os.makedirs('/content/empdata', exist_ok=True)
os.rename('/content/employee_data.csv', '/content/empdata/employee_data.csv')