In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark= SparkSession.builder.appName('day5').getOrCreate()

In [9]:
df=spark.read.format('csv').option('header','True').load('/content/JC-202503-citibike-tripdata.csv')

#read the full station name and also count the station trips
df1= df.groupBy('start_station_name').agg(count('start_station_name').alias('t_trips'))
df1.show(4,False)



+------------------+-------+
|start_station_name|t_trips|
+------------------+-------+
|Lafayette Park    |3      |
|Columbus Drive    |3      |
|Marin Light Rail  |3      |
|Lincoln Park      |3      |
+------------------+-------+
only showing top 4 rows



In [5]:
df1= spark.read.format("csv").option("header","True").load("/content/join1.csv")
df2=spark.read.format("csv").option("header","True").load("/content/join2.csv")

df1.show()
df2.show()

+-----+----------+------+
|txnno|   txndate|amount|
+-----+----------+------+
|    0|06-26-2011|040.33|
|    1|05-26-2011|198.44|
|    2|06-01-2011|005.58|
|    3|06-05-2011|198.19|
|    4|12-17-2011|098.81|
+-----+----------+------+

+---+-------+-------+
|tno| custno|spendby|
+---+-------+-------+
|  0|4007024| credit|
|  1|4006742| credit|
|  3|4002199| credit|
|  7|6768689|   cash|
+---+-------+-------+



In [8]:
df_join= df1.join(df2,df1.txnno==df2.tno)
df_join.show()

+-----+----------+------+---+-------+-------+
|txnno|   txndate|amount|tno| custno|spendby|
+-----+----------+------+---+-------+-------+
|    0|06-26-2011|040.33|  0|4007024| credit|
|    1|05-26-2011|198.44|  1|4006742| credit|
|    3|06-05-2011|198.19|  3|4002199| credit|
+-----+----------+------+---+-------+-------+



In [7]:
#inner Join

df_join= df1.join(df2,df1.txnno==df2.tno, 'inner')
df_join.show()

+-----+----------+------+---+-------+-------+
|txnno|   txndate|amount|tno| custno|spendby|
+-----+----------+------+---+-------+-------+
|    0|06-26-2011|040.33|  0|4007024| credit|
|    1|05-26-2011|198.44|  1|4006742| credit|
|    3|06-05-2011|198.19|  3|4002199| credit|
+-----+----------+------+---+-------+-------+



In [9]:
#left join

df1.join(df2, df1.txnno == df2.tno, 'left').show()



+-----+----------+------+----+-------+-------+
|txnno|   txndate|amount| tno| custno|spendby|
+-----+----------+------+----+-------+-------+
|    0|06-26-2011|040.33|   0|4007024| credit|
|    1|05-26-2011|198.44|   1|4006742| credit|
|    2|06-01-2011|005.58|NULL|   NULL|   NULL|
|    3|06-05-2011|198.19|   3|4002199| credit|
|    4|12-17-2011|098.81|NULL|   NULL|   NULL|
+-----+----------+------+----+-------+-------+



In [10]:
# Right join

df1.join(df2, df1.txnno == df2.tno, 'right').show()

+-----+----------+------+---+-------+-------+
|txnno|   txndate|amount|tno| custno|spendby|
+-----+----------+------+---+-------+-------+
|    0|06-26-2011|040.33|  0|4007024| credit|
|    1|05-26-2011|198.44|  1|4006742| credit|
|    3|06-05-2011|198.19|  3|4002199| credit|
| NULL|      NULL|  NULL|  7|6768689|   cash|
+-----+----------+------+---+-------+-------+



In [11]:
# full Join

df1.join(df2, df1.txnno == df2.tno, 'full').show()

+-----+----------+------+----+-------+-------+
|txnno|   txndate|amount| tno| custno|spendby|
+-----+----------+------+----+-------+-------+
|    0|06-26-2011|040.33|   0|4007024| credit|
|    1|05-26-2011|198.44|   1|4006742| credit|
|    2|06-01-2011|005.58|NULL|   NULL|   NULL|
|    3|06-05-2011|198.19|   3|4002199| credit|
|    4|12-17-2011|098.81|NULL|   NULL|   NULL|
| NULL|      NULL|  NULL|   7|6768689|   cash|
+-----+----------+------+----+-------+-------+



In [12]:
df1.join(df2, df1.txnno== df2.tno, 'full').drop('spendby').show()

+-----+----------+------+----+-------+
|txnno|   txndate|amount| tno| custno|
+-----+----------+------+----+-------+
|    0|06-26-2011|040.33|   0|4007024|
|    1|05-26-2011|198.44|   1|4006742|
|    2|06-01-2011|005.58|NULL|   NULL|
|    3|06-05-2011|198.19|   3|4002199|
|    4|12-17-2011|098.81|NULL|   NULL|
| NULL|      NULL|  NULL|   7|6768689|
+-----+----------+------+----+-------+



In [13]:
df1.join(df2, df1.txnno== df2.tno, 'full').select('txndate','amount').show()

+----------+------+
|   txndate|amount|
+----------+------+
|06-26-2011|040.33|
|05-26-2011|198.44|
|06-01-2011|005.58|
|06-05-2011|198.19|
|12-17-2011|098.81|
|      NULL|  NULL|
+----------+------+



In [14]:
# left_anti_join
df1.join(df2, df1.txnno==df2.tno, 'left_anti').show()

# left_semi_join

df1.join(df2, df1.txnno==df2.tno, 'left_semi').show()

+-----+----------+------+
|txnno|   txndate|amount|
+-----+----------+------+
|    2|06-01-2011|005.58|
|    4|12-17-2011|098.81|
+-----+----------+------+

+-----+----------+------+
|txnno|   txndate|amount|
+-----+----------+------+
|    0|06-26-2011|040.33|
|    1|05-26-2011|198.44|
|    3|06-05-2011|198.19|
+-----+----------+------+



In [19]:
#left_anti join using left_join

df1.join(df2, df1.txnno== df2.tno, 'left').filter(col('tno').isNull()).show()

+-----+----------+------+----+------+-------+
|txnno|   txndate|amount| tno|custno|spendby|
+-----+----------+------+----+------+-------+
|    2|06-01-2011|005.58|NULL|  NULL|   NULL|
|    4|12-17-2011|098.81|NULL|  NULL|   NULL|
+-----+----------+------+----+------+-------+



In [20]:
df= spark.read.format("csv").option('header','True').load('/content/travel.csv')
df.show()

+-------+---------+---------+-----------+-----+
|cust_id|flight_id|   origin|destination|price|
+-------+---------+---------+-----------+-----+
|      1|       f3|    kochi|  Mangalore| 1800|
|      1|       f1|    delhi|  hyderabad| 2500|
|      2|       f2|  Ayodhya|    chennai| 3000|
|      1|       f2|hyderabad|      kochi| 1700|
|      2|       f1|   Mumbai|    Ayodhya| 4000|
+-------+---------+---------+-----------+-----+



In [27]:
df_results= df.orderBy('cust_id','flight_id')
df_results.show()


+-------+---------+---------+-----------+-----+
|cust_id|flight_id|   origin|destination|price|
+-------+---------+---------+-----------+-----+
|      1|       f1|    delhi|  hyderabad| 2500|
|      1|       f2|hyderabad|      kochi| 1700|
|      1|       f3|    kochi|  Mangalore| 1800|
|      2|       f1|   Mumbai|    Ayodhya| 4000|
|      2|       f2|  Ayodhya|    chennai| 3000|
+-------+---------+---------+-----------+-----+



In [29]:
df= df_results.groupBy('cust_id').agg(first('origin').alias('source'),\
                                      last('destination').alias('destination'),\
                                      sum('price').alias('total_Cost')
                                    )
df.show()

+-------+------+-----------+----------+
|cust_id|source|destination|total_Cost|
+-------+------+-----------+----------+
|      1| delhi|  Mangalore|    6000.0|
|      2|Mumbai|    chennai|    7000.0|
+-------+------+-----------+----------+



In [53]:
from pyspark.sql.functions import *

data= [('India','SL','India'),('SL','Aus','Aus'),('SA','Eng','Eng'),('Eng','NZ','NZ'),('Aus','India','India')]

columns= ['Team1','Team2','Winner']

df=spark.createDataFrame(data,columns)
df.show()

+-----+-----+------+
|Team1|Team2|Winner|
+-----+-----+------+
|India|   SL| India|
|   SL|  Aus|   Aus|
|   SA|  Eng|   Eng|
|  Eng|   NZ|    NZ|
|  Aus|India| India|
+-----+-----+------+



In [69]:
from pyspark.sql.functions import count
df1= df.select('Team1')
df2= df.select('Team2')

df_union=df1.unionAll(df2)

#aggreation

df_aggregate = df_union.groupBy('Team1').agg(count('Team1').alias('matches_played'))
df_changed= df_aggregate.withColumnRenamed('Team1','Team_name')

df_changed.show()

# join

df_join= df_changed.join(df, df_changed.Team_name== df.Winner,'left').drop('Team1','Team2')
df_join.show()

# agg

df_agg= df_join.groupBy('Team_name','matches_played').agg(count('Winner').alias('No_of_wins'))
df_agg.show()

#final

from pyspark.sql.types import *

df_final= df_agg.withColumn('matches_played',col('matches_played').cast(IntegerType()))\
                .withColumn('No_of_wins',col('No_of_wins').cast(IntegerType()))\
                .withColumn('No_of_losses',col('matches_played')-col('No_of_wins'))

df_final.show()







+---------+--------------+
|Team_name|matches_played|
+---------+--------------+
|       SL|             2|
|    India|             2|
|      Eng|             2|
|       SA|             1|
|      Aus|             2|
|       NZ|             1|
+---------+--------------+

+---------+--------------+------+
|Team_name|matches_played|Winner|
+---------+--------------+------+
|       SL|             2|  NULL|
|    India|             2| India|
|    India|             2| India|
|      Eng|             2|   Eng|
|       SA|             1|  NULL|
|      Aus|             2|   Aus|
|       NZ|             1|    NZ|
+---------+--------------+------+

+---------+--------------+----------+
|Team_name|matches_played|No_of_wins|
+---------+--------------+----------+
|       SL|             2|         0|
|    India|             2|         2|
|      Eng|             2|         1|
|       SA|             1|         0|
|      Aus|             2|         1|
|       NZ|             1|         1|
+---------+-

In [3]:
data= [(1,1,'N'),
       (2,1,'Y'),
       (2,2,'N'),
       (3,3,'N'),
       (4,2,'N'),
       (4,3,'Y'),
       (4,4,'N')]

columns= ['employee_id','department_id','primary_flag']

df= spark.createDataFrame(data,columns)
df.show()

+-----------+-------------+------------+
|employee_id|department_id|primary_flag|
+-----------+-------------+------------+
|          1|            1|           N|
|          2|            1|           Y|
|          2|            2|           N|
|          3|            3|           N|
|          4|            2|           N|
|          4|            3|           Y|
|          4|            4|           N|
+-----------+-------------+------------+



In [14]:
from pyspark.sql.functions import *

df1= df.select('employee_id')
df2= df.select('department_id')
df3= df.select('primary_flag')

df.select('employee_id','department_id').withColumn('primary_flag', when('primary_flag'== 'Y', 'Y').otherwise ('primary_flag'))

PySparkTypeError: [NOT_COLUMN] Argument `condition` should be a Column, got bool.

In [19]:
df_result= df.withColumn('Rank_flag', when (col('primary_flag')=='Y',1).otherwise (2)).orderBy('employee_id','Rank_flag')


+-----------+-------------+------------+---------+
|employee_id|department_id|primary_flag|Rank_flag|
+-----------+-------------+------------+---------+
|          1|            1|           N|        2|
|          2|            1|           Y|        1|
|          2|            2|           N|        2|
|          3|            3|           N|        2|
|          4|            3|           Y|        1|
|          4|            2|           N|        2|
|          4|            4|           N|        2|
+-----------+-------------+------------+---------+

