In [1]:
#Importing the packages
from pyspark.sql import SparkSession # sstarting point for writing spark apps.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType # Define the structure of df.
import pyspark.sql.functions as func

In [2]:
#Creating the SparkSession
spark = SparkSession.builder.appName("FirstApp").getOrCreate()

In [3]:
#Defining schema for your DataFrame
myschema = StructType([\
                       StructField("userID", IntegerType(), True),
                       StructField("name", StringType(), True),
                       StructField("age",IntegerType(), True),
                       StructField("friends",IntegerType(), True),
                        ])

In [4]:
#Creating DataFrame on a CSV file
people = spark.read.format("csv")\
    .schema(myschema)\
    .option("path","fakefriends.csv")\
    .load() # loads the data from the specified CSV file into a DataFrame (people).

In [10]:
#Performing all thetransformations
output = people.select(people.userID,people.name\
                       ,people.age,people.friends)\
         .where(people.age < 30).withColumn('insert_timestamp', func.current_timestamp())\
         .orderBy(people.userID)

print(output.show(10))

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|    insert_timestamp|
+------+--------+---+-------+--------------------+
|     1|Jean-Luc| 26|      2|2024-08-25 13:19:...|
|     9|    Hugh| 27|    181|2024-08-25 13:19:...|
|    16|  Weyoun| 22|    323|2024-08-25 13:19:...|
|    21|   Miles| 19|    268|2024-08-25 13:19:...|
|    24|  Julian| 25|      1|2024-08-25 13:19:...|
|    25|     Ben| 21|    445|2024-08-25 13:19:...|
|    26|  Julian| 22|    100|2024-08-25 13:19:...|
|    32|     Nog| 26|    281|2024-08-25 13:19:...|
|    35| Beverly| 27|    305|2024-08-25 13:19:...|
|    46|    Morn| 25|     96|2024-08-25 13:19:...|
+------+--------+---+-------+--------------------+
only showing top 10 rows

None


In [11]:
#taking the count of o/p DataFrame
output.count()

112

In [7]:
#Creating a Temp View
output.createOrReplaceTempView("peoples") # If a view with the same name already exists, it will be replaced.

In [8]:
#Running a simple Spark SQL query
spark.sql("select name,age,friends,insert_ts from peoples").show()

+--------+---+-------+--------------------+
|    name|age|friends|           insert_ts|
+--------+---+-------+--------------------+
|Jean-Luc| 26|      2|2024-08-25 13:14:...|
|    Hugh| 27|    181|2024-08-25 13:14:...|
|  Weyoun| 22|    323|2024-08-25 13:14:...|
|   Miles| 19|    268|2024-08-25 13:14:...|
|  Julian| 25|      1|2024-08-25 13:14:...|
|     Ben| 21|    445|2024-08-25 13:14:...|
|  Julian| 22|    100|2024-08-25 13:14:...|
|     Nog| 26|    281|2024-08-25 13:14:...|
| Beverly| 27|    305|2024-08-25 13:14:...|
|    Morn| 25|     96|2024-08-25 13:14:...|
|   Brunt| 24|     49|2024-08-25 13:14:...|
|     Nog| 20|      1|2024-08-25 13:14:...|
| Beverly| 19|    269|2024-08-25 13:14:...|
|   Brunt| 19|      5|2024-08-25 13:14:...|
|  Geordi| 20|    100|2024-08-25 13:14:...|
|  Geordi| 21|    477|2024-08-25 13:14:...|
|  Kasidy| 22|    179|2024-08-25 13:14:...|
|   Brunt| 20|    384|2024-08-25 13:14:...|
|     Ben| 28|    311|2024-08-25 13:14:...|
|    Worf| 24|    492|2024-08-25