In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('deep') \
.getOrCreate()

In [2]:
spark

In [7]:
schema = 'id int, date date, cust_id int, status string'
df = spark.read.csv('/orders/orders_1gb.csv',schema=schema,mode='permissive')
df = df.distinct()
df.coalesce(1).write.mode('overwrite').format('parquet').option('path','/user/orders_deep/').save()

In [19]:
lt = [['a','aa',1],['a','aa',2],['b','bb',5],['b','bb',4],['b','bb',3]]
df = spark.sparkContext.parallelize(lt).toDF(['col1','col2','col3'])
# df = spark.createDataFrame(lt).toDF(*['col1','col2','col3'])
df = df.groupBy('col1','col2').agg(
    collect_list(col('col3')).alias('result')
).orderBy(col('col1').asc(),col('col2').asc())
df.show()

+----+----+---------+
|col1|col2|   result|
+----+----+---------+
|   a|  aa|   [1, 2]|
|   b|  bb|[5, 4, 3]|
+----+----+---------+



In [23]:
lt = [{'id':101,'eid':[101,102,103]},{'id':102,'eid':[104,105]}]
df = spark.sparkContext.parallelize(lt).toDF()
df = df.withColumn('eid',explode(col('eid'))).select('id','eid')
df.show()

+---+---+
| id|eid|
+---+---+
|101|101|
|101|102|
|101|103|
|102|104|
|102|105|
+---+---+



In [5]:
data = [("2023-01-01", "AAPL", 150.00), ("2023-01-02", "AAPL",
155.00), ("2023-01-01", "GOOG", 2500.00), ("2023-01-02", "GOOG",
2550.00), ("2023-01-01", "MSFT", 300.00), ("2023-01-02", "MSFT",
310.00)]

df = spark.createDataFrame(data,['date','type','amt'])
df = df.withColumn('date',to_date(col('date')))
df = df.groupby('type','date').agg(
    mean(col('amt')).alias('avg_amt')
)
df = df.withColumn('max_avg_amt',expr('max(avg_amt) over(partition by type)'))
df = df.sort(['type','date'],asc=[True,True])
df.show()

+----+----------+-------+-----------+
|type|      date|avg_amt|max_avg_amt|
+----+----------+-------+-----------+
|AAPL|2023-01-01|  150.0|      155.0|
|AAPL|2023-01-02|  155.0|      155.0|
|GOOG|2023-01-01| 2500.0|     2550.0|
|GOOG|2023-01-02| 2550.0|     2550.0|
|MSFT|2023-01-01|  300.0|      310.0|
|MSFT|2023-01-02|  310.0|      310.0|
+----+----------+-------+-----------+



In [12]:
data = [
(3000, "22-may"),
(5000, "23-may"),
(5000, "25-may"),
(10000, "22-june"),
(1250, "03-july")
]
df = spark.createDataFrame(data,['salary','date'])
df = df.withColumn('date',month(to_date(concat(col('date'),lit('-2025')),format='dd-MMMM-yyyy')))
df = df.groupby('date').agg(sum(col('salary')).alias('cumm_sum'))
df.show()

+----+--------+
|date|cumm_sum|
+----+--------+
|   5|   13000|
|   7|    1250|
|   6|   10000|
+----+--------+



In [16]:
s = '''Name~|Age
Brayan,gomez~|25
John,Cleark~|30
Sumit,Sen~|31
'''

rdd = spark.sparkContext.parallelize(s.splitlines()[1:])
rdd = rdd.map(lambda x: x.split('~|')).map(lambda x: x[0]+'|'+x[1])
rdd.saveAsTextFile('/user/data1/')

df = spark.read.csv('/user/data1/',sep='|').toDF(*['Name','Age'])
df.show()
df.printSchema()

+------------+---+
|        Name|Age|
+------------+---+
| John,Cleark| 30|
|   Sumit,Sen| 31|
|Brayan,gomez| 25|
+------------+---+

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)



In [22]:
lt = [[1,8],[2,8],[3,8],[4,7],[5,9],[6,9]]
df = spark.sparkContext.parallelize(lt).toDF(['employee_id','team_id'])
#method 1
temp = df.groupby('team_id').count()
final = df.join(temp,df['team_id']==temp['team_id'])
final = final.select('employee_id','count')

#metohd 2
df = df.groupby('team_id').agg(
    collect_list(col('employee_id')).alias('collect')
).withColumn('employee_id',explode('collect')).select('employee_id',size(col('collect')).alias('count'))
final.show()

+-----------+-----+
|employee_id|count|
+-----------+-----+
|          4|    1|
|          5|    2|
|          6|    2|
|          1|    3|
|          2|    3|
|          3|    3|
+-----------+-----+

