<a href="https://colab.research.google.com/github/MichalinaH/DataScienceStudies/blob/main/PAD/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [2]:
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [4]:
!pip install -q findspark

In [5]:
import findspark

findspark.init()

In [6]:
findspark.find()

'/content/spark-3.3.1-bin-hadoop3'

In [7]:
import multiprocessing

import pyspark

cfg = (
    pyspark.SparkConf()
    # Ustawienie mastera, aby działał lokalnie iz maksymalną liczbą rdzeni procesora do wieloprocesorowości.
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    # Ustawianie nazwy aplikacji
    .setAppName("TestApp")
    # Ustawienie wartości konfiguracji za pomocą ciągu znaków
    .set("spark.eventLog.enabled", False)
    # Ustawianie zmiennych środowiskowych dla executorów do użycia
    .setExecutorEnv(pairs=[("VAR3", "value3"), ("VAR4", "value4")])
    # Ustawienie pamięci, jeśli to ustawienie nie zostało wcześniej zrobione
    .setIfMissing("spark.executor.memory", "1g")
)

# Pobieranie pojedynczej zmiennej
print(cfg.get("spark.executor.memory"))
# Lista wszystkich z nich w formacie czytelnego napisu (stringa)
print(cfg.toDebugString())

1g
spark.master=local[2]
spark.app.name=TestApp
spark.eventLog.enabled=False
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
spark.executor.memory=1g


In [8]:
session = pyspark.sql.SparkSession.builder.config(conf=cfg).getOrCreate()

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("SpentByCustomer").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(Klient=int(fields[0]), spent=float(fields[2]))

lines = spark.sparkContext.textFile("customer-orders.csv")
s = lines.map(mapper)
schemaSpent = spark.createDataFrame(s).cache()
schemaSpent.createOrReplaceTempView("spent")

query = spark.sql("SELECT Klient, SUM(spent) as total_spent FROM spent GROUP BY Klient")
for q in query.collect():
  print(q)

schemaSpent.groupBy("Klient").sum("spent").show() 
spark.stop()

Row(Klient=29, total_spent=5032.530000000001)
Row(Klient=26, total_spent=5250.4)
Row(Klient=65, total_spent=5140.35)
Row(Klient=54, total_spent=6065.390000000001)
Row(Klient=19, total_spent=5059.43)
Row(Klient=0, total_spent=5524.949999999999)
Row(Klient=22, total_spent=5019.449999999999)
Row(Klient=7, total_spent=4755.069999999999)
Row(Klient=77, total_spent=4327.73)
Row(Klient=34, total_spent=5330.8)
Row(Klient=50, total_spent=4517.2699999999995)
Row(Klient=94, total_spent=4475.570000000001)
Row(Klient=57, total_spent=4628.4)
Row(Klient=43, total_spent=5368.83)
Row(Klient=32, total_spent=5496.05)
Row(Klient=84, total_spent=4652.9400000000005)
Row(Klient=31, total_spent=4765.049999999999)
Row(Klient=98, total_spent=4297.259999999999)
Row(Klient=39, total_spent=6193.110000000001)
Row(Klient=25, total_spent=5057.610000000001)
Row(Klient=95, total_spent=4876.839999999998)
Row(Klient=71, total_spent=5995.660000000002)
Row(Klient=6, total_spent=5397.879999999999)
Row(Klient=68, total_spent

In [33]:
#sortowanie po spent
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("SpentByCustomerSorted").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(Klient=int(fields[0]), spent=float(fields[2]))

lines = spark.sparkContext.textFile("customer-orders.csv")
s = lines.map(mapper)
schemaSpent = spark.createDataFrame(s).cache()
schemaSpent.createOrReplaceTempView("spentSorted")

query = spark.sql("SELECT Klient, SUM(spent) as total_spent FROM spentSorted GROUP BY Klient ORDER BY total_spent ASC")
#sortowanie od największego będzie DESC
for q in query.collect():
  print(q)

schemaSpent.groupBy("Klient").sum("spent").sort("sum(spent)", ascending=True).show() 
spark.stop()

Row(Klient=45, total_spent=3309.3799999999997)
Row(Klient=79, total_spent=3790.5699999999997)
Row(Klient=96, total_spent=3924.2300000000005)
Row(Klient=23, total_spent=4042.65)
Row(Klient=99, total_spent=4172.29)
Row(Klient=75, total_spent=4178.5)
Row(Klient=36, total_spent=4278.049999999999)
Row(Klient=98, total_spent=4297.259999999999)
Row(Klient=47, total_spent=4316.299999999998)
Row(Klient=77, total_spent=4327.73)
Row(Klient=13, total_spent=4367.619999999999)
Row(Klient=48, total_spent=4384.33)
Row(Klient=49, total_spent=4394.6)
Row(Klient=94, total_spent=4475.570000000001)
Row(Klient=67, total_spent=4505.79)
Row(Klient=50, total_spent=4517.2699999999995)
Row(Klient=78, total_spent=4524.51)
Row(Klient=5, total_spent=4561.07)
Row(Klient=57, total_spent=4628.4)
Row(Klient=83, total_spent=4635.8)
Row(Klient=91, total_spent=4642.26)
Row(Klient=74, total_spent=4647.130000000001)
Row(Klient=84, total_spent=4652.9400000000005)
Row(Klient=3, total_spent=4659.63)
Row(Klient=12, total_spent=

In [34]:
#sortowanie po kliencie 
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("SpentByCustomerSorted").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(Klient=int(fields[0]), spent=float(fields[2]))

lines = spark.sparkContext.textFile("customer-orders.csv")
s = lines.map(mapper)
schemaSpent = spark.createDataFrame(s).cache()
schemaSpent.createOrReplaceTempView("spentSorted")

query = spark.sql("SELECT Klient, SUM(spent) as total_spent FROM spentSorted GROUP BY Klient ORDER BY Klient ASC")
#sortowanie od największego będzie DESC
for q in query.collect():
  print(q)
  
schemaSpent.groupBy("Klient").sum("spent").sort("Klient", ascending=True).show()  
spark.stop()

Row(Klient=0, total_spent=5524.949999999999)
Row(Klient=1, total_spent=4958.600000000001)
Row(Klient=2, total_spent=5994.59)
Row(Klient=3, total_spent=4659.63)
Row(Klient=4, total_spent=4815.050000000001)
Row(Klient=5, total_spent=4561.07)
Row(Klient=6, total_spent=5397.879999999999)
Row(Klient=7, total_spent=4755.069999999999)
Row(Klient=8, total_spent=5517.24)
Row(Klient=9, total_spent=5322.65)
Row(Klient=10, total_spent=4819.7)
Row(Klient=11, total_spent=5152.289999999999)
Row(Klient=12, total_spent=4664.589999999999)
Row(Klient=13, total_spent=4367.619999999999)
Row(Klient=14, total_spent=4735.030000000001)
Row(Klient=15, total_spent=5413.51)
Row(Klient=16, total_spent=4979.06)
Row(Klient=17, total_spent=5032.68)
Row(Klient=18, total_spent=4921.269999999999)
Row(Klient=19, total_spent=5059.43)
Row(Klient=20, total_spent=4836.860000000001)
Row(Klient=21, total_spent=4707.41)
Row(Klient=22, total_spent=5019.449999999999)
Row(Klient=23, total_spent=4042.65)
Row(Klient=24, total_spent=