### Instaliavimas

1. Patikrinama python versija sistemoje:
        python --version
        (dabar python 3.9.7)
        
2. Įsirašoma Anaconda ir sukuriamas naujas environment su ta pačia python versija:
       conda create -c conda-forge python=3.9.7 -n spark
       
3. Aktyvuojamas environment ir įrašomas pyspark ir jupyter notebook, java būtinai per pip:
        pip install pyspark
        pip install jupyterlab
        pip install install-jdk

# Pagrindai

In [1]:
# Darbo sesijos pradžia

from pyspark.sql import SparkSession, functions as F


spark = SparkSession.builder\
        .master("local[*]")\
        .getOrCreate()

In [16]:
# Dokumento perskaitymas

df = spark.read.json(
    "C://MOCK_DATA.json",
    multiLine=True)

df.show()

+--------------------+------------+-----------+---+---------------+----------+
|               email|  first_name|     gender| id|     ip_address| last_name|
+--------------------+------------+-----------+---+---------------+----------+
|bkynder0@pinteres...|       Bibby|     Female|  1|  196.100.78.83|    Kynder|
|gthaxton1@google....|        Gris|       Male|  2| 231.63.126.194|   Thaxton|
|mevershed2@techcr...|Massimiliano|Genderqueer|  3|  50.193.119.80|  Evershed|
|ldaulby3@sakura.n...|     Leonard|     Female|  4| 18.239.231.228|   D'Aulby|
|    lcaress4@nps.gov|     Lorelle|       Male|  5|  145.118.13.17|    Caress|
|  cmasserel5@free.fr|      Carola|       Male|  6|  156.37.181.31|  Masserel|
|btooth6@fastcompa...|        Bari|       Male|  7|224.254.175.240|     Tooth|
|ewemes7@infoseek....|   Ekaterina|       Male|  8|154.250.143.162|     Wemes|
|  pvedeneev8@cdc.gov|     Patrice|       Male|  9|    127.10.4.53|  Vedeneev|
|  glamasna9@xing.com|   Georgetta|       Male| 10| 

In [3]:
# Susirikiavimas stulpelių

df = df.select("id",
          "first_name",
          "last_name",
          "gender",
          "ip_address",
          "email").sort("id")
df.show()

+---+------------+----------+-----------+---------------+--------------------+
| id|  first_name| last_name|     gender|     ip_address|               email|
+---+------------+----------+-----------+---------------+--------------------+
|  1|       Bibby|    Kynder|     Female|  196.100.78.83|bkynder0@pinteres...|
|  2|        Gris|   Thaxton|       Male| 231.63.126.194|gthaxton1@google....|
|  3|Massimiliano|  Evershed|Genderqueer|  50.193.119.80|mevershed2@techcr...|
|  4|     Leonard|   D'Aulby|     Female| 18.239.231.228|ldaulby3@sakura.n...|
|  5|     Lorelle|    Caress|       Male|  145.118.13.17|    lcaress4@nps.gov|
|  6|      Carola|  Masserel|       Male|  156.37.181.31|  cmasserel5@free.fr|
|  7|        Bari|     Tooth|       Male|224.254.175.240|btooth6@fastcompa...|
|  8|   Ekaterina|     Wemes|       Male|154.250.143.162|ewemes7@infoseek....|
|  9|     Patrice|  Vedeneev|       Male|    127.10.4.53|  pvedeneev8@cdc.gov|
| 10|   Georgetta|   Lamasna|       Male|   41.159.2

In [4]:
# Grupavimas ir agregavimas, stulpelio pavadinimo keitimas

df1 = df.groupBy("gender").count()\
    .withColumnRenamed("gender", "Lytis")\
    .withColumnRenamed("count", "Populiacija")

df1.show()


+-----------+-----------+
|      Lytis|Populiacija|
+-----------+-----------+
|Genderqueer|         16|
|    Agender|          9|
|     Female|        469|
| Polygender|         17|
|   Bigender|         15|
| Non-binary|         17|
|       Male|        439|
|Genderfluid|         18|
+-----------+-----------+



In [5]:
# Filtravimas filter tas pats kas where

df2 = df.filter(df.first_name.startswith("A") |
               df.last_name.startswith("A"))\
               .show()


+---+----------+-----------+-----------+---------------+--------------------+
| id|first_name|  last_name|     gender|     ip_address|               email|
+---+----------+-----------+-----------+---------------+--------------------+
| 12|     Andie|    Speddin|     Female| 54.108.162.248|aspeddinb@seattle...|
| 21|    Marlie|    Allcorn|       Male|   20.58.48.233|mallcornk@shinyst...|
| 29|   Dorothy|    Andriss|       Male|    91.12.34.62|dandrisss@geociti...|
| 49|    Aylmar|  Winnister|     Female|  105.98.198.18|awinnister1c@woot...|
| 52|   Aurelea|   Nesfield|       Male|  94.127.209.36| anesfield1f@sun.com|
| 59|  Alastair|    Whitton|Genderfluid|   204.184.17.9|awhitton1m@networ...|
| 61|  Adrienne|    Ragless|    Agender|   88.213.153.7|aragless1o@drupal...|
| 67|   Suzanna|      Axton|     Female|  115.56.171.61|  saxton1u@webmd.com|
| 75|   Glendon|      Aggio|     Female|  139.208.22.24|gaggio22@washingt...|
| 83|    Annice|Bridgestock|       Male| 213.36.231.174|abridges

In [6]:
# Braodcast - keitimas kategorinių duomenų (pvz; santrumpų kūrimas arba reikšmių ilginimas USA - united states)

genders = {"Male":"M", "Female":"F",
           "Genderqueer":"Other", "Agender":"Other",
           "Polygender":"Other", "Bigender":"Other",
           "Non-binary":"Other", "Genderfluid":"Other"}

broadcastGender = spark.sparkContext.broadcast(genders)

def gender_converter(code):
    return broadcastGender.value[code]

result = df.rdd.map(lambda x: (x[0], x[1], x[2], gender_converter(x[3]), x[4], x[5])).toDF(df.schema)

result.show()


+---+------------+----------+------+---------------+--------------------+
| id|  first_name| last_name|gender|     ip_address|               email|
+---+------------+----------+------+---------------+--------------------+
|  1|       Bibby|    Kynder|     F|  196.100.78.83|bkynder0@pinteres...|
|  2|        Gris|   Thaxton|     M| 231.63.126.194|gthaxton1@google....|
|  3|Massimiliano|  Evershed| Other|  50.193.119.80|mevershed2@techcr...|
|  4|     Leonard|   D'Aulby|     F| 18.239.231.228|ldaulby3@sakura.n...|
|  5|     Lorelle|    Caress|     M|  145.118.13.17|    lcaress4@nps.gov|
|  6|      Carola|  Masserel|     M|  156.37.181.31|  cmasserel5@free.fr|
|  7|        Bari|     Tooth|     M|224.254.175.240|btooth6@fastcompa...|
|  8|   Ekaterina|     Wemes|     M|154.250.143.162|ewemes7@infoseek....|
|  9|     Patrice|  Vedeneev|     M|    127.10.4.53|  pvedeneev8@cdc.gov|
| 10|   Georgetta|   Lamasna|     M|   41.159.251.1|  glamasna9@xing.com|
| 11|     Cristal|    Estcot|     M|  

In [7]:
# Joins

#Random datasetai:
print("Left dataset")
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show(truncate=False)

print("Right dataset")
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()


"""------------------------------------------------------------------------------------------------------------"""
print("Inner")
umpDF = empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"inner")
umpDF.show()
print("Outer")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"outer") \
     .show()
print("Left")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"left") \
     .show()
print("Right")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"right") \
     .show()
print("Cross")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"cross") \
     .show()
print("Anti")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"anti") \
     .show()
print("Semi")
empDF.join(deptDF, deptDF.dept_id == empDF.emp_dept_id,"semi") \
     .show()

Left dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

Right dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

Inner
+------+--------+---------------+-----------+-----------+------+------+---------+----

In [97]:
#Merge arba Union

# Dataset'as
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df10 = spark.createDataFrame(data = simpleData, schema = columns)
df10.show()

# Dataset'as su ta pačia struktūra
simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]
df12 = spark.createDataFrame(data = simpleData2, schema = columns2)
df12.show(truncate=False)

# Union
unionDF = df10.union(df12)
unionDF.show(truncate=False)



+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----

In [98]:
# Rašyti sql queries on spark

df.createOrReplaceTempView("NAME_TABLE1")

spark.sql("select first_name from NAME_TABLE1") \
     .show(truncate=False)

+------------+
|first_name  |
+------------+
|Bibby       |
|Gris        |
|Massimiliano|
|Leonard     |
|Lorelle     |
|Carola      |
|Bari        |
|Ekaterina   |
|Patrice     |
|Georgetta   |
|Cristal     |
|Andie       |
|Saundra     |
|Corella     |
|Gamaliel    |
|Burk        |
|Crysta      |
|Lorelei     |
|Bond        |
|Marcie      |
+------------+
only showing top 20 rows



In [99]:
# Map function Pvz.: sujungti columnus vardas pavarde

rdd2=df.rdd.map(lambda x: 
    (x[0], x[1]+" "+x[2],x[3],x[4],x[5])
    )  

df16=rdd2.toDF(["ID", "Vardas Pavardė", "Lytis", "IP adressas", "El. paštas"])
df16.show()

+---+--------------------+-----------+---------------+--------------------+
| ID|      Vardas Pavardė|      Lytis|    IP adressas|          El. paštas|
+---+--------------------+-----------+---------------+--------------------+
|  1|        Bibby Kynder|     Female|  196.100.78.83|bkynder0@pinteres...|
|  2|        Gris Thaxton|       Male| 231.63.126.194|gthaxton1@google....|
|  3|Massimiliano Ever...|Genderqueer|  50.193.119.80|mevershed2@techcr...|
|  4|     Leonard D'Aulby|     Female| 18.239.231.228|ldaulby3@sakura.n...|
|  5|      Lorelle Caress|       Male|  145.118.13.17|    lcaress4@nps.gov|
|  6|     Carola Masserel|       Male|  156.37.181.31|  cmasserel5@free.fr|
|  7|          Bari Tooth|       Male|224.254.175.240|btooth6@fastcompa...|
|  8|     Ekaterina Wemes|       Male|154.250.143.162|ewemes7@infoseek....|
|  9|    Patrice Vedeneev|       Male|    127.10.4.53|  pvedeneev8@cdc.gov|
| 10|   Georgetta Lamasna|       Male|   41.159.251.1|  glamasna9@xing.com|
| 11|      C

In [100]:
# Lit for constant value, withCollumn - pridėti stulpelį

df_ups = df16.withColumn("Country", F.lit("United States of America"))

df_ups.show()

+---+--------------------+-----------+---------------+--------------------+--------------------+
| ID|      Vardas Pavardė|      Lytis|    IP adressas|          El. paštas|             Country|
+---+--------------------+-----------+---------------+--------------------+--------------------+
|  1|        Bibby Kynder|     Female|  196.100.78.83|bkynder0@pinteres...|United States of ...|
|  2|        Gris Thaxton|       Male| 231.63.126.194|gthaxton1@google....|United States of ...|
|  3|Massimiliano Ever...|Genderqueer|  50.193.119.80|mevershed2@techcr...|United States of ...|
|  4|     Leonard D'Aulby|     Female| 18.239.231.228|ldaulby3@sakura.n...|United States of ...|
|  5|      Lorelle Caress|       Male|  145.118.13.17|    lcaress4@nps.gov|United States of ...|
|  6|     Carola Masserel|       Male|  156.37.181.31|  cmasserel5@free.fr|United States of ...|
|  7|          Bari Tooth|       Male|224.254.175.240|btooth6@fastcompa...|United States of ...|
|  8|     Ekaterina Wemes|    

In [101]:
# Pivot table kaip kurt

df_ups.groupby("Country").pivot("Lytis").count().show()


+--------------------+-------+--------+------+-----------+-----------+----+----------+----------+
|             Country|Agender|Bigender|Female|Genderfluid|Genderqueer|Male|Non-binary|Polygender|
+--------------------+-------+--------+------+-----------+-----------+----+----------+----------+
|United States of ...|      9|      15|   469|         18|         16| 439|        17|        17|
+--------------------+-------+--------+------+-----------+-----------+----+----------+----------+



In [132]:
#Descriptive statistics
umpDF.select("salary").summary().show()

#Mean
umpDF.select(F.mean("salary").alias("Average")).show()

#Count
umpDF.select(F.approx_count_distinct("salary").alias("Count")).show()

#Generate list of unique values
umpDF.select(F.collect_set("salary").alias("List of Unique")).show()

#Count of distinct values
umpDF.select(F.countDistinct("salary").alias("Count of Distinct")).show()

#First and last row values
umpDF.select(F.first("salary")).show()
umpDF.select(F.last("salary")).show()

# Ekscesas (aštrumas)
umpDF.select(F.kurtosis("salary")).show()

# skewness (asimetrijos koeficientas)
umpDF.select(F.skewness("salary")).show()

# Standartinis nuokrypis: populiacijos ir imties
umpDF.select(F.stddev_pop("salary")).show()
umpDF.select(F.stddev_samp("salary")).show()


+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                 5|
|   mean|            1999.8|
| stddev|1581.4550894666595|
|    min|                -1|
|    max|              4000|
+-------+------------------+

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|                 5|
|   mean|            1999.8|
| stddev|1581.4550894666595|
|    min|                -1|
|    25%|              1000|
|    50%|              2000|
|    75%|              3000|
|    max|              4000|
+-------+------------------+

+-------+
|Average|
+-------+
| 1999.8|
+-------+

+-----+
|Count|
+-----+
|    5|
+-----+

+--------------------+
|      List of Unique|
+--------------------+
|[-1, 3000, 1000, ...|
+--------------------+

+-----------------+
|Count of Distinct|
+-----------------+
|                5|
+-----------------+

+-------------+
|first(salary)|
+-------------+
|         3000|
+-------------+


In [27]:
# Window funkcija

from pyspark.sql.window import Window

# Partition By pasirenkama dalis duomenų bazės pagal kuri skaičiuojamas row_number
# Jei partition by nenurodamas generuojami tiesiog id skirtingi kiekvienai eilutei
windowSpec  = Window.partitionBy().orderBy("id")
# Eilė
df_new = df.withColumn("row_number", F.row_number().over(windowSpec)) \

df_new.show(20)

# Row number pagal gender ir rūšiuojama pagal id
windowSpec1  = Window.partitionBy("department").orderBy("id", "name")
# Eilė
df_new1 = df.withColumn("Eil_nr", F.row_number().over(windowSpec1))
df_new1.show(20)


+--------------------+------------+-----------+---+---------------+----------+----------+
|               email|  first_name|     gender| id|     ip_address| last_name|row_number|
+--------------------+------------+-----------+---+---------------+----------+----------+
|bkynder0@pinteres...|       Bibby|     Female|  1|  196.100.78.83|    Kynder|         1|
|gthaxton1@google....|        Gris|       Male|  2| 231.63.126.194|   Thaxton|         2|
|mevershed2@techcr...|Massimiliano|Genderqueer|  3|  50.193.119.80|  Evershed|         3|
|ldaulby3@sakura.n...|     Leonard|     Female|  4| 18.239.231.228|   D'Aulby|         4|
|    lcaress4@nps.gov|     Lorelle|       Male|  5|  145.118.13.17|    Caress|         5|
|  cmasserel5@free.fr|      Carola|       Male|  6|  156.37.181.31|  Masserel|         6|
|btooth6@fastcompa...|        Bari|       Male|  7|224.254.175.240|     Tooth|         7|
|ewemes7@infoseek....|   Ekaterina|       Male|  8|154.250.143.162|     Wemes|         8|
|  pvedene

In [17]:
# Get only dublicate values from dataset

df_sp= spark.read.json("C://example.json", multiLine=True)

df_sp= df_sp.select("first_name",
               "last_name",
               "gender",
               "ip_address",
               "email").sort("id")
df_sp1 = df_sp.drop("id")

print("Original")
df_sp1.show()

df_sp2=df_sp1.groupBy(df1.columns).count().filter(F.col("count") > 1).drop("count")
print("Dublicates only")
df_sp2.show()

Original
+------------+---------+-----------+---------------+--------------------+
|  first_name|last_name|     gender|     ip_address|               email|
+------------+---------+-----------+---------------+--------------------+
|       Bibby|   Kynder|     Female|  196.100.78.83|bkynder0@pinteres...|
|        Gris|  Thaxton|       Male| 231.63.126.194|gthaxton1@google....|
|Massimiliano| Evershed|Genderqueer|  50.193.119.80|mevershed2@techcr...|
|     Leonard|  D'Aulby|     Female| 18.239.231.228|ldaulby3@sakura.n...|
|     Lorelle|   Caress|       Male|  145.118.13.17|    lcaress4@nps.gov|
|      Carola| Masserel|       Male|  156.37.181.31|  cmasserel5@free.fr|
|        Bari|    Tooth|       Male|224.254.175.240|btooth6@fastcompa...|
|   Ekaterina|    Wemes|       Male|154.250.143.162|ewemes7@infoseek....|
|     Patrice| Vedeneev|       Male|    127.10.4.53|  pvedeneev8@cdc.gov|
|   Georgetta|  Lamasna|       Male|   41.159.251.1|  glamasna9@xing.com|
|     Leonard|  D'Aulby|     