In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
import pyspark.sql.functions as func 

In [3]:
spark = SparkSession.builder.appName("SparkFirstApp").getOrCreate()

In [21]:
myschema = StructType([\
                       StructField("userID", IntegerType(),True),
                       StructField("name", StringType(),True), 
                       StructField("age", IntegerType(),True),
                       StructField("friends", IntegerType(),True)
                       ])

In [22]:
#read csv file into dataframe with schema as myschema
people = spark.read.format("csv")\
         .schema(myschema)\
         .option("path","fakefriends.csv")\
         .load()

In [23]:
people.count()

500

In [24]:
people.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)



In [25]:
output = people.select(people.userID,people.name\
                       ,people.age,people.friends)\
        .where(people.age>30).withColumn('insert_ts',func.current_timestamp())\
        .orderBy(people.userID)

In [26]:
output.count()

377

In [11]:
output.show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|           insert_ts|
+------+--------+---+-------+--------------------+
|     0|    Will| 33|    385|2024-07-07 08:43:...|
|     2|    Hugh| 55|    221|2024-07-07 08:43:...|
|     3|  Deanna| 40|    465|2024-07-07 08:43:...|
|     4|   Quark| 68|     21|2024-07-07 08:43:...|
|     5|  Weyoun| 59|    318|2024-07-07 08:43:...|
|     6|  Gowron| 37|    220|2024-07-07 08:43:...|
|     7|    Will| 54|    307|2024-07-07 08:43:...|
|     8|  Jadzia| 38|    380|2024-07-07 08:43:...|
|    10|     Odo| 53|    191|2024-07-07 08:43:...|
|    11|     Ben| 57|    372|2024-07-07 08:43:...|
|    12|   Keiko| 54|    253|2024-07-07 08:43:...|
|    13|Jean-Luc| 56|    444|2024-07-07 08:43:...|
|    14|    Hugh| 43|     49|2024-07-07 08:43:...|
|    15|     Rom| 36|     49|2024-07-07 08:43:...|
|    17|     Odo| 35|     13|2024-07-07 08:43:...|
|    18|Jean-Luc| 45|    455|2024-07-07 08:43:...|
|    19|  Geordi| 60|    246|20

In [27]:
output.createOrReplaceTempView('peoples')

In [42]:
## use filter operation
output.filter('age>=50').show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|           insert_ts|
+------+--------+---+-------+--------------------+
|     2|    Hugh| 55|    221|2024-07-08 23:24:...|
|     4|   Quark| 68|     21|2024-07-08 23:24:...|
|     5|  Weyoun| 59|    318|2024-07-08 23:24:...|
|     7|    Will| 54|    307|2024-07-08 23:24:...|
|    10|     Odo| 53|    191|2024-07-08 23:24:...|
|    11|     Ben| 57|    372|2024-07-08 23:24:...|
|    12|   Keiko| 54|    253|2024-07-08 23:24:...|
|    13|Jean-Luc| 56|    444|2024-07-08 23:24:...|
|    19|  Geordi| 60|    246|2024-07-08 23:24:...|
|    20|     Odo| 67|    220|2024-07-08 23:24:...|
|    23|   Keiko| 51|    271|2024-07-08 23:24:...|
|    30|   Keiko| 50|    175|2024-07-08 23:24:...|
|    33|   Dukat| 53|    197|2024-07-08 23:24:...|
|    37|  Geordi| 58|     21|2024-07-08 23:24:...|
|    38|  Deanna| 64|     65|2024-07-08 23:24:...|
|    40|     Odo| 52|    413|2024-07-08 23:24:...|
|    41|    Hugh| 67|    167|20

In [45]:
## another way filter operation
output.filter(output['age']>=50).select(['name','age']).show()

+--------+---+
|    name|age|
+--------+---+
|    Hugh| 55|
|   Quark| 68|
|  Weyoun| 59|
|    Will| 54|
|     Odo| 53|
|     Ben| 57|
|   Keiko| 54|
|Jean-Luc| 56|
|  Geordi| 60|
|     Odo| 67|
|   Keiko| 51|
|   Keiko| 50|
|   Dukat| 53|
|  Geordi| 58|
|  Deanna| 64|
|     Odo| 52|
|    Hugh| 67|
|   Brunt| 54|
|  Guinan| 58|
|   Dukat| 52|
+--------+---+
only showing top 20 rows



In [50]:
## add multiple conditions in filter operation
output.filter((output['age']>=50) & (output['friends']>=250)).select(['name','age']).show()

+--------+---+
|    name|age|
+--------+---+
|  Weyoun| 59|
|    Will| 54|
|     Ben| 57|
|   Keiko| 54|
|Jean-Luc| 56|
|   Keiko| 51|
|     Odo| 52|
|  Guinan| 58|
|   Quark| 51|
|     Odo| 57|
|    Morn| 59|
|  Kasidy| 62|
|   Brunt| 52|
|    Data| 61|
|   Leeta| 58|
|   Dukat| 67|
|  Jadzia| 54|
|    Hugh| 57|
|   Quark| 66|
|    Hugh| 55|
+--------+---+
only showing top 20 rows



In [28]:
spark.sql("select name,age,friends,insert_ts from peoples").show()

+--------+---+-------+--------------------+
|    name|age|friends|           insert_ts|
+--------+---+-------+--------------------+
|    Will| 33|    385|2024-07-08 23:03:...|
|    Hugh| 55|    221|2024-07-08 23:03:...|
|  Deanna| 40|    465|2024-07-08 23:03:...|
|   Quark| 68|     21|2024-07-08 23:03:...|
|  Weyoun| 59|    318|2024-07-08 23:03:...|
|  Gowron| 37|    220|2024-07-08 23:03:...|
|    Will| 54|    307|2024-07-08 23:03:...|
|  Jadzia| 38|    380|2024-07-08 23:03:...|
|     Odo| 53|    191|2024-07-08 23:03:...|
|     Ben| 57|    372|2024-07-08 23:03:...|
|   Keiko| 54|    253|2024-07-08 23:03:...|
|Jean-Luc| 56|    444|2024-07-08 23:03:...|
|    Hugh| 43|     49|2024-07-08 23:03:...|
|     Rom| 36|     49|2024-07-08 23:03:...|
|     Odo| 35|     13|2024-07-08 23:03:...|
|Jean-Luc| 45|    455|2024-07-08 23:03:...|
|  Geordi| 60|    246|2024-07-08 23:03:...|
|     Odo| 67|    220|2024-07-08 23:03:...|
|   Keiko| 51|    271|2024-07-08 23:03:...|
|   Leeta| 42|    363|2024-07-08

In [32]:
## add new column
output.withColumn("age after two year", output['age']+2).show()

+------+--------+---+-------+--------------------+------------------+
|userID|    name|age|friends|           insert_ts|age after two year|
+------+--------+---+-------+--------------------+------------------+
|     0|    Will| 33|    385|2024-07-08 23:04:...|                35|
|     2|    Hugh| 55|    221|2024-07-08 23:04:...|                57|
|     3|  Deanna| 40|    465|2024-07-08 23:04:...|                42|
|     4|   Quark| 68|     21|2024-07-08 23:04:...|                70|
|     5|  Weyoun| 59|    318|2024-07-08 23:04:...|                61|
|     6|  Gowron| 37|    220|2024-07-08 23:04:...|                39|
|     7|    Will| 54|    307|2024-07-08 23:04:...|                56|
|     8|  Jadzia| 38|    380|2024-07-08 23:04:...|                40|
|    10|     Odo| 53|    191|2024-07-08 23:04:...|                55|
|    11|     Ben| 57|    372|2024-07-08 23:04:...|                59|
|    12|   Keiko| 54|    253|2024-07-08 23:04:...|                56|
|    13|Jean-Luc| 56

In [35]:
## rename column
output.withColumnRenamed('friends','my friends').show()


+------+--------+---+----------+--------------------+
|userID|    name|age|my friends|           insert_ts|
+------+--------+---+----------+--------------------+
|     0|    Will| 33|       385|2024-07-08 23:07:...|
|     2|    Hugh| 55|       221|2024-07-08 23:07:...|
|     3|  Deanna| 40|       465|2024-07-08 23:07:...|
|     4|   Quark| 68|        21|2024-07-08 23:07:...|
|     5|  Weyoun| 59|       318|2024-07-08 23:07:...|
|     6|  Gowron| 37|       220|2024-07-08 23:07:...|
|     7|    Will| 54|       307|2024-07-08 23:07:...|
|     8|  Jadzia| 38|       380|2024-07-08 23:07:...|
|    10|     Odo| 53|       191|2024-07-08 23:07:...|
|    11|     Ben| 57|       372|2024-07-08 23:07:...|
|    12|   Keiko| 54|       253|2024-07-08 23:07:...|
|    13|Jean-Luc| 56|       444|2024-07-08 23:07:...|
|    14|    Hugh| 43|        49|2024-07-08 23:07:...|
|    15|     Rom| 36|        49|2024-07-08 23:07:...|
|    17|     Odo| 35|        13|2024-07-08 23:07:...|
|    18|Jean-Luc| 45|       

In [36]:
output.withColumnRenamed('my friends','friends').show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|           insert_ts|
+------+--------+---+-------+--------------------+
|     0|    Will| 33|    385|2024-07-08 23:08:...|
|     2|    Hugh| 55|    221|2024-07-08 23:08:...|
|     3|  Deanna| 40|    465|2024-07-08 23:08:...|
|     4|   Quark| 68|     21|2024-07-08 23:08:...|
|     5|  Weyoun| 59|    318|2024-07-08 23:08:...|
|     6|  Gowron| 37|    220|2024-07-08 23:08:...|
|     7|    Will| 54|    307|2024-07-08 23:08:...|
|     8|  Jadzia| 38|    380|2024-07-08 23:08:...|
|    10|     Odo| 53|    191|2024-07-08 23:08:...|
|    11|     Ben| 57|    372|2024-07-08 23:08:...|
|    12|   Keiko| 54|    253|2024-07-08 23:08:...|
|    13|Jean-Luc| 56|    444|2024-07-08 23:08:...|
|    14|    Hugh| 43|     49|2024-07-08 23:08:...|
|    15|     Rom| 36|     49|2024-07-08 23:08:...|
|    17|     Odo| 35|     13|2024-07-08 23:08:...|
|    18|Jean-Luc| 45|    455|2024-07-08 23:08:...|
|    19|  Geordi| 60|    246|20

In [38]:
output.drop('age after two years').show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|           insert_ts|
+------+--------+---+-------+--------------------+
|     0|    Will| 33|    385|2024-07-08 23:08:...|
|     2|    Hugh| 55|    221|2024-07-08 23:08:...|
|     3|  Deanna| 40|    465|2024-07-08 23:08:...|
|     4|   Quark| 68|     21|2024-07-08 23:08:...|
|     5|  Weyoun| 59|    318|2024-07-08 23:08:...|
|     6|  Gowron| 37|    220|2024-07-08 23:08:...|
|     7|    Will| 54|    307|2024-07-08 23:08:...|
|     8|  Jadzia| 38|    380|2024-07-08 23:08:...|
|    10|     Odo| 53|    191|2024-07-08 23:08:...|
|    11|     Ben| 57|    372|2024-07-08 23:08:...|
|    12|   Keiko| 54|    253|2024-07-08 23:08:...|
|    13|Jean-Luc| 56|    444|2024-07-08 23:08:...|
|    14|    Hugh| 43|     49|2024-07-08 23:08:...|
|    15|     Rom| 36|     49|2024-07-08 23:08:...|
|    17|     Odo| 35|     13|2024-07-08 23:08:...|
|    18|Jean-Luc| 45|    455|2024-07-08 23:08:...|
|    19|  Geordi| 60|    246|20

In [36]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/C:/Users/HP/OneDrive/Desktop/nucleusteq%2520training/pyspark/spark-warehouse')]

In [43]:
spark.catalog.listTables()

[Table(name='peoples', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [22]:
spark.catalog.dropGlobalTempView("peoples")

False

In [None]:
output.write \
.format("json").mode("overwrite") \
.option("path","output/op/") \
.partitionBy("age") \
.save()
