In [67]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
spark=SparkSession.builder.appName(name='Pyspark examples').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/13 06:56:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# DataFrame’dan faqat `name` va `age` ustunlarini tanlang

df = spark.createDataFrame([("Ali", 25), ("Vali", 30)], ["name", "age"])

df.select('name', 'age').show()

                                                                                                    

+----+---+
|name|age|
+----+---+
| Ali| 25|
|Vali| 30|
+----+---+



In [4]:
# Faqat `age > 25` bo‘lgan yozuvlarni filter qiling.

df = spark.createDataFrame([("Ali", 25), ("Vali", 30)], ["name", "age"])

df.filter(df['age']>25).show()

+----+---+
|name|age|
+----+---+
|Vali| 30|
+----+---+



In [5]:
# Har bir ismga "Mr." prefixini qo‘shib chiqaring
from pyspark.sql.functions import concat, lit

df = spark.createDataFrame([("Ali",), ("Vali",)], ["name"])

df = df.withColumn("name", concat(lit("Mr. "), df["name"]))

df.show()

+--------+
|    name|
+--------+
| Mr. Ali|
|Mr. Vali|
+--------+



In [6]:
# `age` ustuniga 5 qo‘shing va yangi ustun sifatida yarating.

df = spark.createDataFrame([("Ali", 25), ("Vali", 30)], ["name", "age"])

df=df.withColumn('algo', df['age']+5)

df.show()

+----+---+----+
|name|age|algo|
+----+---+----+
| Ali| 25|  30|
|Vali| 30|  35|
+----+---+----+



In [7]:
# DataFrame’da qancha satr borligini hisoblang.

df = spark.createDataFrame([("Ali", 25), ("Vali", 30), ("Hasan", 22)], ["name", "age"])

df.count()

3

In [8]:
# Barcha yoshlarning o‘rtacha qiymatini hisoblang.
from pyspark.sql.functions import avg, col
df = spark.createDataFrame([("Ali", 25), ("Vali", 30), ("Hasan", 22)], ["name", "age"])
df.select(avg(col("age"))).show()


+------------------+
|          avg(age)|
+------------------+
|25.666666666666668|
+------------------+



In [9]:
# Ismlarni alifbo tartibida sort qiling.
df = spark.createDataFrame([("Vali",), ("Ali",), ("Hasan",)], ["name"])

df.sort('name').show()

+-----+
| name|
+-----+
|  Ali|
|Hasan|
| Vali|
+-----+



In [10]:
# DataFrame’ga gender ustunini "male" deb qo‘shing.
from pyspark.sql.functions import lit
df = spark.createDataFrame([("Ali", 25), ("Vali", 30)], ["name", "age"])

df.withColumn('gender', lit('male'))

df.show()

+----+---+
|name|age|
+----+---+
| Ali| 25|
|Vali| 30|
+----+---+



In [11]:
# Ismlarning uzunligini hisoblab yangi ustun yarating.
from pyspark.sql.functions import length

df = spark.createDataFrame([("Ali",), ("Vali",)], ["name"])

df = df.withColumn('length', length("name"))

df.show()

+----+------+
|name|length|
+----+------+
| Ali|     3|
|Vali|     4|
+----+------+



In [12]:
# Yoshni 10 ga bo‘lingan qiymatini hisoblab ustun yarating.

df = spark.createDataFrame([("Ali", 25), ("Vali", 30)], ["name", "age"])

df=df.withColumn('algo', df['age']/10)

df.show()

+----+---+----+
|name|age|algo|
+----+---+----+
| Ali| 25| 2.5|
|Vali| 30| 3.0|
+----+---+----+



In [13]:
# Har bir department bo‘yicha o‘rtacha maoshni toping.

df = spark.createDataFrame([("IT", 1000), ("HR", 800), ("IT", 1200)], ["department", "salary"])

df.groupBy('department').agg({'salary':'avg'}).show()

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|        IT|     1100.0|
|        HR|      800.0|
+----------+-----------+



In [14]:
# Har bir department bo‘yicha ishchilar sonini hisoblang.

df = spark.createDataFrame([("IT",), ("HR",), ("IT",)], ["department"])

df.groupBy('department').count().show()

+----------+-----+
|department|count|
+----------+-----+
|        IT|    2|
|        HR|    1|
+----------+-----+



In [15]:
# Eng yuqori maoshli departmentni toping.
from pyspark.sql.functions import max
df = spark.createDataFrame([("IT", 1000), ("HR", 2000), ("Sales", 1500)], ["department", "salary"])

# 1-usul 
# df.orderBy("salary", ascending=False).limit(1).show()

# 2-usul
max_salary = df.agg(max("salary").alias("max_salary")).collect()[0]["max_salary"]
df.filter(df.salary == max_salary).show()

+----------+------+
|department|salary|
+----------+------+
|        HR|  2000|
+----------+------+



In [16]:
# gender bo‘yicha salary summasini toping.

df = spark.createDataFrame([("male", 1000), ("female", 1200), ("male", 800)], ["gender", "salary"])

df.groupBy('gender').agg({'salary': 'sum'}).show()


+------+-----------+
|gender|sum(salary)|
+------+-----------+
|  male|       1800|
|female|       1200|
+------+-----------+



In [17]:
# Har bir cityda eng katta yoshi bor foydalanuvchini toping.

df = spark.createDataFrame([("Tashkent", "Ali", 25), ("Tashkent", "Vali", 35), ("Bukhara", "Sami", 30)], ["city", "name", "age"])

df_maxs=df.groupBy('city').agg({'age': 'max'}).show()

+--------+--------+
|    city|max(age)|
+--------+--------+
|Tashkent|      35|
| Bukhara|      30|
+--------+--------+



In [18]:
# Null qiymatli satrlarni chiqarib tashlang.

df = spark.createDataFrame([("Ali", 25), (None, 30), ("Vali", None)], ["name", "age"])

df.dropna().show()
df.show()

+----+---+
|name|age|
+----+---+
| Ali| 25|
+----+---+

+----+----+
|name| age|
+----+----+
| Ali|  25|
|NULL|  30|
|Vali|NULL|
+----+----+



In [19]:
# age bo‘yicha minimal va maksimal qiymatni toping.
from pyspark.sql.functions import max, min
df = spark.createDataFrame([("Ali", 25), ("Vali", 30), ("Hasan", 20)], ["name", "age"])

df.select(max('age')).show()
df.select(min('age')).show()

+--------+
|max(age)|
+--------+
|      30|
+--------+

+--------+
|min(age)|
+--------+
|      20|
+--------+



In [20]:
# department bo‘yicha salaryning o‘rtacha, min va max qiymatlarini toping.
df = spark.createDataFrame([("IT", 1000), ("HR", 2000), ("IT", 1500)], ["department", "salary"])

df.groupBy('department').agg({'salary':'min'}).show()
df.groupBy('department').agg({'salary':'min'}).show()
df.groupBy('department').agg({'salary':'avg'}).show()

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|        IT|       1000|
|        HR|       2000|
+----------+-----------+

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|        IT|       1000|
|        HR|       2000|
+----------+-----------+

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|        IT|     1250.0|
|        HR|     2000.0|
+----------+-----------+



In [21]:
# Har bir department bo‘yicha eng so‘nggi qo‘shilgan xodimni toping (join_date).

df = spark.createDataFrame([("IT", "Ali", "2023-01-01"), ("IT", "Vali", "2024-01-01")], ["department", "name", "join_date"])
df.withColumn('join_date', df['join_date'].cast('date'))
df.groupBy('department').agg({'join_date':'max'}).show()


+----------+--------------+
|department|max(join_date)|
+----------+--------------+
|        IT|    2024-01-01|
+----------+--------------+



In [32]:
# Har bir foydalanuvchining salary darajasi above average yoki below average ekanligini belgilang.
from pyspark.sql.functions import avg, col, when

df = spark.createDataFrame([("Ali", 1000), ("Vali", 2000), ("Sami", 1500)], ["name", "salary"])

avg_salary = df.select(avg("salary")).collect()[0][0]

df = df.withColumn("level", when(col("salary") >= avg_salary, "above average").otherwise("below average"))

df.show()

+----+------+-------------+
|name|salary|        level|
+----+------+-------------+
| Ali|  1000|below average|
|Vali|  2000|above average|
|Sami|  1500|above average|
+----+------+-------------+



In [39]:
# Ikkita DataFrame’ni id bo‘yicha birlashtiring.
df1 = spark.createDataFrame([(1, "Ali"), (2, "Vali")], ["id", "name"])
df2 = spark.createDataFrame([(1, "IT"), (2, "HR")], ["id", "department"])

df=df1.join(df2, df1['id']==df2['id'], 'inner')

df.select(df1['id'], 'name', 'department').show()

+---+----+----------+
| id|name|department|
+---+----+----------+
|  1| Ali|        IT|
|  2|Vali|        HR|
+---+----+----------+



In [41]:
# Chap join orqali barcha foydalanuvchilar va ularning bo‘limlarini ko‘rsating.

df=df1.join(df2, df1['id']==df2['id'], 'left')

df.show()

+---+----+---+----------+
| id|name| id|department|
+---+----+---+----------+
|  1| Ali|  1|        IT|
|  2|Vali|  2|        HR|
+---+----+---+----------+



In [42]:
# To‘liq outer join orqali barcha ma’lumotlarni qo‘shing.

df=df1.join(df2, df1['id']==df2['id'], 'outer')

df.show()

+---+----+---+----------+
| id|name| id|department|
+---+----+---+----------+
|  1| Ali|  1|        IT|
|  2|Vali|  2|        HR|
+---+----+---+----------+



In [46]:
# Join qiling, lekin faqat mos kelmaganlarini ko‘rsating (anti join).

df=df1.join(df2, df1['id']==df2['id'], 'left_anti')

df.show()


+---+----+
| id|name|
+---+----+
+---+----+



In [50]:
# Ikkita jadvalni birlashtiring va fullName ustunini yarating.

df1 = spark.createDataFrame([(1, "Ali")], ["id", "firstName"])
df2 = spark.createDataFrame([(1, "Valiyev")], ["id", "lastName"])

df=df1.join(df2, df1['id']==df2['id'], how='inner')

df=df.withColumn('FullName', df['firstName']+df['lastName'])

df.show()

+---+---------+---+--------+--------+
| id|firstName| id|lastName|FullName|
+---+---------+---+--------+--------+
|  1|      Ali|  1| Valiyev|    NULL|
+---+---------+---+--------+--------+



In [63]:
# user_id bo‘yicha orders va users ni join qiling.

users = spark.createDataFrame([(1, "Ali"), (2, "Vali")], ["user_id", "name"])
orders = spark.createDataFrame([(1, 100), (1, 200), (2, 150)], ["user_id", "amount"])

df=users.join(orders, users['user_id']==orders['user_id'], 'inner')

df.show()

+-------+----+-------+------+
|user_id|name|user_id|amount|
+-------+----+-------+------+
|      1| Ali|      1|   100|
|      1| Ali|      1|   200|
|      2|Vali|      2|   150|
+-------+----+-------+------+



                                                                                                    

In [65]:
# Join qiling va har bir foydalanuvchining umumiy buyurtma miqdorini hisoblang.
from pyspark.sql.functions import sum
df = users.join(orders, on="user_id", how="inner")
df.groupBy("user_id", "name").agg(sum("amount").alias("total_amount")).show()

+-------+----+------------+
|user_id|name|total_amount|
+-------+----+------------+
|      1| Ali|         300|
|      2|Vali|         150|
+-------+----+------------+



In [None]:
# users jadvalidan hech qanday buyurtma qilmagan foydalanuvchilarni toping.

df=users.join(orders, users['user_id']==orders['user_id'], 'left_anti').show()

In [70]:
# Har bir department ichida salary bo‘yicha rank yarating.

df = spark.createDataFrame([("IT", "Ali", 1000), ("IT", "Vali", 1200), ("HR", "Sami", 1100)], ["dept", "name", "salary"])

df=df.groupBy('dept').agg({'salary': 'sum'})

df.sort('sum(salary)').show()

+----+-----------+
|dept|sum(salary)|
+----+-----------+
|  HR|       1100|
|  IT|       2200|
+----+-----------+



In [73]:
# Rolling average hisoblang oxirgi 3 satr uchun. (1-usul)

df = spark.createDataFrame([(1, 100), (2, 200), (3, 300), (4, 400)], ["day", "value"])

df=df.filter(df['day']>1).agg({'value': 'avg'})

df.show()


+----------+
|avg(value)|
+----------+
|     300.0|
+----------+



In [77]:
# Rolling average hisoblang oxirgi 3 satr uchun. (1-usul)
from pyspark.sql.functions import avg
from pyspark.sql.window import Window

df = spark.createDataFrame([(1, 100), (2, 200), (3, 300), (4, 400)], ["day", "value"])

n=3

windowSpec = Window.orderBy("day").rowsBetween(-n + 1, 0)

df = df.withColumn("rolling_avg", avg("value").over(windowSpec))

df.show()


25/05/13 08:31:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:31:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:31:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:31:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:31:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+-----+-----------+
|day|value|rolling_avg|
+---+-----+-----------+
|  1|  100|      100.0|
|  2|  200|      150.0|
|  3|  300|      200.0|
|  4|  400|      300.0|
+---+-----+-----------+



In [81]:
# LAG funksiyasi yordamida avvalgi satrdagi qiymatni chiqarish.
from pyspark.sql.functions import lag
from pyspark.sql.window import Window

windowSpec = Window.orderBy("day")

df = df.withColumn("prev_value", lag("value", 1).over(windowSpec))

df.show()

25/05/13 08:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+-----+-----------+----------+
|day|value|rolling_avg|prev_value|
+---+-----+-----------+----------+
|  1|  100|      100.0|      NULL|
|  2|  200|      150.0|       100|
|  3|  300|      200.0|       200|
|  4|  400|      300.0|       300|
+---+-----+-----------+----------+



25/05/13 08:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:37:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [85]:
# LEAD funksiyasi yordamida keyingi qiymatni ko‘rsatish.
from pyspark.sql.functions import lead

windowSpec = Window.orderBy("day")

df = df.withColumn("prev_value", lead("value", 1).over(windowSpec))

df.show()

25/05/13 08:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+-----+-----------+----------+
|day|value|rolling_avg|prev_value|
+---+-----+-----------+----------+
|  1|  100|      100.0|       200|
|  2|  200|      150.0|       300|
|  3|  300|      200.0|       400|
|  4|  400|      300.0|      NULL|
+---+-----+-----------+----------+



25/05/13 08:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/13 08:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [None]:
# Foydalanuvchining umumiy buyurtmalarini hisoblash (partition by).

df = spark.createDataFrame([("Ali", 100), ("Ali", 200), ("Vali", 300)], ["name", "amount"])

windowSpec = Window.partitionBy("name")
df = df.withColumn("total_amount", sum("amount").over(windowSpec))

df.show()


In [96]:
# Row number har bir foydalanuvchining buyurtmalari bo‘yicha yarating.
from pyspark.sql.functions import row_number

df = spark.createDataFrame([("Ali", 100), ("Ali", 200), ("Vali", 300)], ["name", "amount"])

# 1-usul
# df.groupBy('name').agg({'amount': 'sum'}).show()

# 2-usul

windowSpec = Window.partitionBy("name").orderBy("amount")

# Row number ustunini qo‘shish
df = df.withColumn("row_num", row_number().over(windowSpec))

df.show()

+----+------+-------+
|name|amount|row_num|
+----+------+-------+
| Ali|   100|      1|
| Ali|   200|      2|
|Vali|   300|      1|
+----+------+-------+



In [101]:
# Har bir bo‘limda eng ko‘p maosh olgan xodimni tanlang.

df = spark.createDataFrame([("IT", "Ali", 1000), ("IT", "Vali", 1200), ("HR", "Sami", 1100)], ["dept", "name", "salary"])

sort=Window.partitionBy('dept')

df=df.withColumn('max', max('salary').over(sort))

df.filter(df['salary']==df['max']).show()

+----+----+------+----+
|dept|name|salary| max|
+----+----+------+----+
|  HR|Sami|  1100|1100|
|  IT|Vali|  1200|1200|
+----+----+------+----+



In [106]:
# Ish boshlagan kundan boshlab umumiy daromadni hisoblang.
from pyspark.sql.functions import sum

df = spark.createDataFrame([("Ali", "2024-01-01", 100), ("Ali", "2024-01-02", 150)], ["name", "date", "amount"])

sort=Window.partitionBy('name')

df=df.withColumn('sum', sum('amount').over(sort))

df.first()

Row(name='Ali', date='2024-01-01', amount=100, sum=250)

In [112]:
# Nested struktura (StructType) bilan ishlang va fieldni ajrating.
from pyspark.sql.functions import col

df = spark.createDataFrame([(("Ali", 25),)], ["person"])

df2 = df.select(
    col("person._1").alias("name"),
    col("person._2").alias("age")
)

df2.show()

+----+---+
|name|age|
+----+---+
| Ali| 25|
+----+---+



In [120]:
# Array ustunidan elementlarni ajrating.
from pyspark.sql.functions import col, explode

try:
    df = spark.createDataFrame([(["Ali", "Vali"],)], ["names"])

    df2=df.select(
        col('names')[0].alias('name1'),
        col('names')[1].alias('name2')
    )
    
    df2.show()
except Exception as e:
    print(e)


+-----+-----+
|name1|name2|
+-----+-----+
|  Ali| Vali|
+-----+-----+



In [122]:
# Explode funksiyasi orqali array’ni to‘g‘rilang.

df2=df.select(explode(df['names']).alias('name'))

df2.show()


+----+
|name|
+----+
| Ali|
|Vali|
+----+



In [127]:
# JSON ustunini parse qiling.
from pyspark.sql.functions import json_tuple

df = spark.createDataFrame([('{"name": "Ali", "age": 25}',)], ["json_col"])

df2 = df.select(
    json_tuple(col("json_col"), "name", "age").alias("name", "age")
)

df2.show()

+----+---+
|name|age|
+----+---+
| Ali| 25|
+----+---+



In [137]:
from pyspark.sql import functions as F

# DataFrame yaratish
df = spark.createDataFrame([("2024-01-01",)], ["date"])

# Date ustunini Date turiga o'zgartirish
df = df.withColumn("date", df["date"].cast("date"))

# Yil va oy ajratish
df = df.withColumn("year", F.year(df["date"])) \
       .withColumn("month", F.month(df["date"]))

# Natijani ko‘rish
df.show()


+----------+----+-----+
|      date|year|month|
+----------+----+-----+
|2024-01-01|2024|    1|
+----------+----+-----+

