# Init

In [3]:
import os, sys
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-21-openjdk-amd64'
os.environ['PYSPARK_PYTHON'] = sys.executable                # Путь к текущему интерпретатору
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable 

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
appName = 'app'
master = 'local[*]'

conf = SparkConf().setAppName(appName).setMaster(master) \
                  .set("spark.executor.extrajavaoptions", "-Xss64m") \
                  .set("spark.python.daemon.module", "pyspark.daemon") \
                  .set("spark.python.worker.module", "pyspark.worker") \
                  .set("spark.python.worker.reuse", "true") \
                  .set("spark.python.worker.python", os.getenv('PYTHON_PATH'))
sc = SparkContext(conf=conf)

Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
24/07/22 13:25:01 WARN Utils: Your hostname, kali resolves to a loopback address: 127.0.1.1; using 192.168.0.101 instead (on interface eth0)
24/07/22 13:25:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/22 13:25:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc

24/07/22 13:25:16 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# RDD

In [31]:
nums = list(range(0, 1000000))

nums_rdd = sc.parallelize(nums)
nums_rdd.take(5)
# ...
# операции над RDD
# ...

24/07/22 14:46:11 WARN TaskSetManager: Stage 21 contains a task of very large size (2332 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[0, 1, 2, 3, 4]

# SQL

In [4]:
from pyspark.sql import SparkSession

In [5]:
master = 'local[*]'

spark = SparkSession            \
        .builder                \
        .master(master)         \
        .getOrCreate()  

spark.conf.set('spark.sql.repl.eagerEval.enabled', True) # format output tables better
spark

Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
24/07/22 19:15:45 WARN Utils: Your hostname, kali resolves to a loopback address: 127.0.1.1; using 192.168.0.101 instead (on interface eth0)
24/07/22 19:15:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/22 19:15:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### DataFrame

In [48]:
import kaggle

# Войти на Kaggle
kaggle.api.authenticate()
# Загрузить набор данных
kaggle.api.dataset_download_files('brendan45774/test-file', path='tested', unzip=True)

Dataset URL: https://www.kaggle.com/datasets/brendan45774/test-file


In [51]:
os.system('mv tested data')

0

In [6]:
# Загрузить .csv в датафрейм
titanic_df = spark.read.csv('data/tested.csv', header=True, inferSchema=True)
titanic_df.limit(5)

24/07/22 19:16:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S


In [5]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
titanic_df.select('PassengerId', 'Survived').limit(5)

PassengerId,Survived
892,0
893,1
894,0
895,0
896,1


In [9]:
titanic_df.where(
                  (titanic_df.Age > 25) & (titanic_df.Survived == 1)
                )                                                                   \
                .limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q
906,1,1,"Chaffee, Mrs. Her...",female,47.0,1,0,W.E.P. 5734,61.175,E31,S
910,1,3,"Ilmakangas, Miss....",female,27.0,1,0,STON/O2. 3101270,7.925,,S
911,1,3,"""Assaf Khalil, Mr...",female,45.0,0,0,2696,7.225,,C


In [10]:
titanic_df.agg({'Fare':'avg'})

avg(Fare)
35.6271884892086


In [13]:
titanic_df.groupBy('Pclass')                    \
          .agg({'Fare': 'avg'})                 \
          .orderBy('Pclass', ascending=False)

Pclass,avg(Fare)
3,12.459677880184334
2,22.20210430107527
1,94.28029719626169


In [14]:
titanic_df.filter(titanic_df.Age > 25)          \
          .agg({'Fare': 'avg'})

avg(Fare)
54.10220476190474


In [13]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf # user defined func - функция определенная пользователем

def round_float_down(x):
    return int(x)

round_float_down_udf = udf(round_float_down, IntegerType())

titanic_df.select(
                  'PassengerId', 
                  'Fare',
                  round_float_down_udf('Fare').alias('Fare Rounded Down')) \
          .limit(5)

                                                                                

PassengerId,Fare,Fare Rounded Down
892,7.8292,7
893,7.0,7
894,9.6875,9
895,8.6625,8
896,12.2875,12


In [20]:
titanic_df.createOrReplaceTempView('Titanic')

In [21]:
spark.sql(
"""
SELECT * 
FROM Titanic
LIMIT 5
"""
         )

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S


# Functions

In [15]:
from pyspark.sql import functions as F

In [27]:
newDf = titanic_df.select(
                            'Name', 
                            F.concat('Name', 'Sex').alias('Name_sex')
                        )
newDf.limit(5)

Name,Name_sex
"Kelly, Mr. James","Kelly, Mr. Jamesmale"
"Wilkes, Mrs. Jame...","Wilkes, Mrs. Jame..."
"Myles, Mr. Thomas...","Myles, Mr. Thomas..."
"Wirz, Mr. Albert","Wirz, Mr. Albertmale"
"Hirvonen, Mrs. Al...","Hirvonen, Mrs. Al..."


## drop column

In [28]:
newDf = newDf.drop(newDf['Name_sex'])
newDf.limit(5)

                                                                                

Name
"Kelly, Mr. James"
"Wilkes, Mrs. Jame..."
"Myles, Mr. Thomas..."
"Wirz, Mr. Albert"
"Hirvonen, Mrs. Al..."


## add column

In [29]:
newDf = newDf.withColumn('first_letter', F.substring('Name', 1, 1))
newDf.limit(5)

Name,first_letter
"Kelly, Mr. James",K
"Wilkes, Mrs. Jame...",W
"Myles, Mr. Thomas...",M
"Wirz, Mr. Albert",W
"Hirvonen, Mrs. Al...",H


## filter

In [32]:
titanic_df.filter(
                    F.col('Name')
                    .startswith('K')
                 )                                               \
           .orderBy('Name', ascending=True)                      \
           .limit(5)                                            

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1115,0,3,"Karlsson, Mr. Ein...",male,21.0,0,0,350053,7.7958,,S
1087,0,3,"Karlsson, Mr. Jul...",male,33.0,0,0,347465,7.8542,,S
1138,1,2,"Karnes, Mrs. J Fr...",female,22.0,0,0,F.C.C. 13534,21.0,,S
932,0,3,"Karun, Mr. Franz",male,39.0,0,1,349256,13.4167,,C
908,0,2,"Keane, Mr. Daniel",male,35.0,0,0,233734,12.35,,Q


In [35]:
titanic_df.filter(
                    "Name like 'K%'"
                 )                                               \
           .orderBy('Name', ascending=True)                      \
           .limit(5)                                       

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1115,0,3,"Karlsson, Mr. Ein...",male,21.0,0,0,350053,7.7958,,S
1087,0,3,"Karlsson, Mr. Jul...",male,33.0,0,0,347465,7.8542,,S
1138,1,2,"Karnes, Mrs. J Fr...",female,22.0,0,0,F.C.C. 13534,21.0,,S
932,0,3,"Karun, Mr. Franz",male,39.0,0,1,349256,13.4167,,C
908,0,2,"Keane, Mr. Daniel",male,35.0,0,0,233734,12.35,,Q


### and

In [37]:
# Разные варианты одного и того же действия AND

#titanic_df.filter((F.col('Survived') == 1) & (F.col('Pclass') == 1))
#titanic_df.filter(F.col('Survived') == 1).filter(F.col('Pclass') == 1)

#titanic_df.filter((titanic_df['Survived'] == 1) & (titanic_df['Pclass'] == 1))
#titanic_df.filter(titanic_df['Survived'] == 1).filter(titanic_df['Pclass'] == 1)

titanic_df.filter("""Survived == 1 and Pclass == 1""")

titanic_df.limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S


### or

In [41]:
# 3 варианта действия or
#titanic_df.filter((F.col('Survived') == 1) | (F.col('Pclass') == 1))
#titanic_df.filter((titanic_df['Survived'] == 1) | (titanic_df['Pclass'] == 1))
titanic_df.filter("""Survived == 1 or Pclass == 1""")

titanic_df.limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S


### distinct

In [59]:
titanic_df.select(titanic_df['Pclass']).count()

418

In [60]:
titanic_df.select(titanic_df['Pclass']).distinct().count()

3

### join

In [10]:
df_1 = titanic_df.select(
                    'PassengerId',
                    'Name',
                    'Sex'
                  )
df_2 = titanic_df.select(
                    'PassengerId',
                    'Pclass',
                    'Survived'
                  )
joinExpression = df_1['PassengerId'] == df_2['PassengerId']
joined_Df = df_1.join(df_2, joinExpression, 'inner')
joined_Df.limit(5)

PassengerId,Name,Sex,PassengerId.1,Pclass,Survived
892,"Kelly, Mr. James",male,892,3,0
893,"Wilkes, Mrs. Jame...",female,893,3,1
894,"Myles, Mr. Thomas...",male,894,2,0
895,"Wirz, Mr. Albert",male,895,3,0
896,"Hirvonen, Mrs. Al...",female,896,3,1


### union

In [11]:
united = df_1.union(df_2)
united.count()

836

### agg all data

In [18]:
titanic_df.select(F.sum('Age'))

sum(Age)
10050.5


### GroupBy

In [19]:
titanic_df.groupBy('Pclass').sum('Age').show()

+------+--------+
|Pclass|sum(Age)|
+------+--------+
|     1|  4010.0|
|     3| 3508.08|
|     2| 2532.42|
+------+--------+



In [20]:
titanic_df.groupBy('Pclass')                            \
          .agg(                     
                  F.sum('Age'), 
                  F.max('Age'), 
                  F.min('Age'), 
                  F.avg('Age'))

                                                                                

Pclass,sum(Age),max(Age),min(Age),avg(Age)
1,4010.0,76.0,6.0,40.91836734693877
3,3508.08,60.5,0.17,24.02794520547945
2,2532.42,63.0,0.92,28.7775
