In [8]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("practice").getOrCreate()

sc=spark.sparkContext

In [12]:
citibike_df=spark.read.csv("JC-202503-citibike-tripdata.csv", header=True)

citibike_df.show(10,False)

citibike_df.printSchema()

+----------------+-------------+-----------------------+-----------------------+------------------+----------------+------------------------------------------+--------------+------------------+------------------+------------------+------------------+-------------+
|ride_id         |rideable_type|started_at             |ended_at               |start_station_name|start_station_id|end_station_name                          |end_station_id|start_lat         |start_lng         |end_lat           |end_lng           |member_casual|
+----------------+-------------+-----------------------+-----------------------+------------------+----------------+------------------------------------------+--------------+------------------+------------------+------------------+------------------+-------------+
|29DAF43DD84B4B7A|electric_bike|2025-03-20 18:58:31.217|2025-03-20 19:00:46.466|6 St & Grand St   |HB302           |Mama Johnson Field - 4 St & Jackson St    |HB404         |40.744397833095604|-74.03450086

In [14]:
citibike_df.createOrReplaceTempView("citibike")

result=spark.sql("select rideable_type, \
                        count(*) as total_ride, \
                        min (ride_id) min_ride, \
                        max(ride_id) max_ride \
                        from citibike group by rideable_type")
result.show()
##  now i want to save this result as pequet file.

result.write.mode("append").parquet("results")

+-------------+----------+----------------+----------------+
|rideable_type|total_ride|        min_ride|        max_ride|
+-------------+----------+----------------+----------------+
| classic_bike|     24847|0004A6F6F9FDFDCE|FFFBA93AF8EC7D30|
|electric_bike|     48446|000107582539997A|FFFFC22BF2B52CD8|
+-------------+----------+----------------+----------------+



In [15]:
### Saving data as json file format.

result.write.mode("append").json("results")

In [17]:
## Dropping the columns
data=[("krishna",30),("varun",10),("ram",62)]

columns=["name","age"]

df=spark.createDataFrame(data,columns)

df.show()

df.printSchema()


+-------+---+
|   name|age|
+-------+---+
|krishna| 30|
|  varun| 10|
|    ram| 62|
+-------+---+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [18]:
from pyspark.sql.functions import expr

df2=df.withColumn("type", expr("case when age <18 then 'minor' \
                                     when age >=18 and age <=60 then 'major' \
                                     else 'sr.citizen' end"))

df2.show()

+-------+---+----------+
|   name|age|      type|
+-------+---+----------+
|krishna| 30|     major|
|  varun| 10|     minor|
|    ram| 62|sr.citizen|
+-------+---+----------+



In [19]:
## Droping column
# i want to drop the age column.
df2=df.withColumn("type", expr("case when age <18 then 'minor' \
                                     when age >=18 and age <=60 then 'major' \
                                     else 'sr.citizen' end")).drop("age")

df2.show()

+-------+----------+
|   name|      type|
+-------+----------+
|krishna|     major|
|  varun|     minor|
|    ram|sr.citizen|
+-------+----------+



In [23]:
df_drop_age = df.drop("age","name")
df_drop_age.show() ##i droped both column here

++
||
++
||
||
||
++



In [37]:
data=[('Shubham',30),('Charan',28),('Bharat',40)]
columns=["name","age"]

df=spark.createDataFrame(data,columns)

df.show()

df.write.mode("overwrite").parquet("test")

+-------+---+
|   name|age|
+-------+---+
|Shubham| 30|
| Charan| 28|
| Bharat| 40|
+-------+---+



In [38]:
data=[('Shubham',30,"Pune"),('Charan',28,"Bangalore"),('Bharat',40,"Bangalore")]
columns=["name","age","city"]

df1=spark.createDataFrame(data,columns)

df1.show()

df.write.mode("append").parquet("test")

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|Shubham| 30|     Pune|
| Charan| 28|Bangalore|
| Bharat| 40|Bangalore|
+-------+---+---------+



In [55]:
data=[('Shubham',30,"Pune"),('Charan',28,"Bangalore"),('Bharat',40,"Bangalore")]
columns=["name","age","city"]

df=spark.createDataFrame(data,columns)

df.show()

df.write.mode("overwrite").csv("test_csv",header=True)

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|Shubham| 30|     Pune|
| Charan| 28|Bangalore|
| Bharat| 40|Bangalore|
+-------+---+---------+



In [56]:
data=[('Vikram',25,"Hyderabad"),('Deepika',32,"Chennai"),('Rahul',35,"Delhi")]
columns=["name","age","city"]

df1=spark.createDataFrame(data,columns)

df1.show()

df1.write.mode("append").csv("test_csv",header=True)

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
| Vikram| 25|Hyderabad|
|Deepika| 32|  Chennai|
|  Rahul| 35|    Delhi|
+-------+---+---------+



In [39]:
par_df=spark.read.parquet("test")

par_df.show()

+-------+---+
|   name|age|
+-------+---+
|Shubham| 30|
|Shubham| 30|
| Charan| 28|
| Bharat| 40|
| Charan| 28|
| Bharat| 40|
+-------+---+



In [58]:
csv_df=spark.read.csv("test_csv",header=True)

csv_df.show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
| Charan| 28|Bangalore|
| Bharat| 40|Bangalore|
|Deepika| 32|  Chennai|
|  Rahul| 35|    Delhi|
| Vikram| 25|Hyderabad|
|Shubham| 30|     Pune|
+-------+---+---------+



In [68]:
data=[('Shubham',30),('Charan',28),('Bharat',40)]
columns=["name","age"]

df=spark.createDataFrame(data,columns)

df.show()

df.write.mode("overwrite").parquet("test_par")

data=[('Shubham',"Pune",30),('Charan',"Bangalore",28),('Bharat',"Bangalore",40)]
columns=["name","age","city"]

df1=spark.createDataFrame(data,columns)

df1.show()

df.write.mode("append").parquet("test_par")

+-------+---+
|   name|age|
+-------+---+
|Shubham| 30|
| Charan| 28|
| Bharat| 40|
+-------+---+

+-------+---------+----+
|   name|      age|city|
+-------+---------+----+
|Shubham|     Pune|  30|
| Charan|Bangalore|  28|
| Bharat|Bangalore|  40|
+-------+---------+----+



In [69]:
par_df=spark.read.parquet("test_par")

par_df.show()

+-------+---+
|   name|age|
+-------+---+
|Shubham| 30|
|Shubham| 30|
| Charan| 28|
| Bharat| 40|
| Charan| 28|
| Bharat| 40|
+-------+---+



In [71]:
data=[('Shubham',30),('Charan',28),('Bharat',40)]
columns=["name","age"]

df=spark.createDataFrame(data,columns)

df.show()

df.write.mode("overwrite").csv("test_csv2",header=True)

data=[('Shubham',"Pune",30),('Charan',"Bangalore",28),('Bharat',"Bangalore",40)]
columns=["name","age","city"]

df1=spark.createDataFrame(data,columns)

df1.show()

df.write.mode("append").csv("test_csv2",header=True)

+-------+---+
|   name|age|
+-------+---+
|Shubham| 30|
| Charan| 28|
| Bharat| 40|
+-------+---+

+-------+---------+----+
|   name|      age|city|
+-------+---------+----+
|Shubham|     Pune|  30|
| Charan|Bangalore|  28|
| Bharat|Bangalore|  40|
+-------+---------+----+



In [72]:
csv2_df=spark.read.csv("test_csv2",header=True)

csv2_df.show()

+-------+---+
|   name|age|
+-------+---+
| Charan| 28|
| Bharat| 40|
| Charan| 28|
| Bharat| 40|
|Shubham| 30|
|Shubham| 30|
+-------+---+

