In [1]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
import pyspark
import quinn

sc = pyspark.SparkContext('local[*]')
rdd = sc.parallelize(range(100))

In [2]:
rdd.takeSample(False, 13)

[61, 42, 95, 90, 65, 5, 12, 2, 70, 19, 84, 38, 22]

In [3]:
rdd = sc.parallelize([
    {'Name': 'Anastasiya', 'Salary': 2000, 'Company': 'Grid Dynamics'},
    {'Name': 'Gleb', 'Salary': 1200, 'Company': 'Haulmont'},
    {'Name': 'Roman', 'Salary': 1500, 'Company': 'Mirantis'},
    {'Name': 'Oleg', 'Salary': 1300, 'Company': 'OpenDev'},
    {'Name': 'Artem', 'Salary': 500, 'Company': 'Mercury'},
    {'Name': 'Alina', 'Salary': 2000, 'Company': 'EPAM'},
    {'Name': 'Arina', 'Salary': 1800, 'Company': 'Grid Dynamics'},
    {'Name': 'Sveta', 'Salary': 1000, 'Company': 'Exactpro'},
    {'Name': 'Rita', 'Salary': 1000, 'Company': 'Neoflex'}
])

spark = SparkSession.builder.getOrCreate()
Salary_information = spark.createDataFrame(rdd)
Salary_information.show()

+-------------+----------+------+
|      Company|      Name|Salary|
+-------------+----------+------+
|Grid Dynamics|Anastasiya|  2000|
|     Haulmont|      Gleb|  1200|
|     Mirantis|     Roman|  1500|
|      OpenDev|      Oleg|  1300|
|      Mercury|     Artem|   500|
|         EPAM|     Alina|  2000|
|Grid Dynamics|     Arina|  1800|
|     Exactpro|     Sveta|  1000|
|      Neoflex|      Rita|  1000|
+-------------+----------+------+



In [4]:
Salary_information.select('Company').show()

+-------------+
|      Company|
+-------------+
|Grid Dynamics|
|     Haulmont|
|     Mirantis|
|      OpenDev|
|      Mercury|
|         EPAM|
|Grid Dynamics|
|     Exactpro|
|      Neoflex|
+-------------+



In [5]:
Salary_information.filter(col('Salary') == '2000').show()

+-------------+----------+------+
|      Company|      Name|Salary|
+-------------+----------+------+
|Grid Dynamics|Anastasiya|  2000|
|         EPAM|     Alina|  2000|
+-------------+----------+------+



In [6]:
Salary_information.filter("Name == 'Oleg' or Name = 'Gleb'").select('Company').show()

+--------+
| Company|
+--------+
|Haulmont|
| OpenDev|
+--------+



In [7]:
Salary_information.groupby('Company').count().show()

+-------------+-----+
|      Company|count|
+-------------+-----+
|Grid Dynamics|    2|
|     Haulmont|    1|
|     Mirantis|    1|
|      OpenDev|    1|
|      Mercury|    1|
|         EPAM|    1|
|     Exactpro|    1|
|      Neoflex|    1|
+-------------+-----+



In [8]:
Purchase = spark.read.option('header', 'true').csv('result.csv')
Purchase.printSchema()

root
 |-- time: string (nullable = true)
 |-- number_of_purchases: string (nullable = true)
 |-- location_and_number_of_visits: string (nullable = true)



In [9]:
Purchase.show(truncate=False)

+-----+-------------------+-----------------------------------------------+
|time |number_of_purchases|location_and_number_of_visits                  |
+-----+-------------------+-----------------------------------------------+
|09:00|4                  |{'Australia':5,'United States':6,'Macedonia':2}|
|10:00|2                  |{'United States':11,'Peru':1,'Belgium':3}      |
|11:00|4                  |{'China':4,'Turkey':10,'Thailand':3}           |
|12:00|3                  |{'Japan':11,'China':2,'Sweden':1}              |
|13:00|7                  |{'Macedonia':9,'United States':5,'Belgium':5]  |
|14:00|6                  |{'Albania':8,'Japan':3,'United States':12}     |
|15:00|7                  |{'China':9,'Hungary':2,'Belarus':11}           |
|16:00|7                  |{'Bolivia':10,'Japan':3,'United States':12}    |
+-----+-------------------+-----------------------------------------------+



In [10]:
Purchase.groupBy('number_of_purchases').count().show()

+-------------------+-----+
|number_of_purchases|count|
+-------------------+-----+
|                  7|    3|
|                  3|    1|
|                  6|    1|
|                  4|    2|
|                  2|    1|
+-------------------+-----+



In [11]:
@udf(returnType=StringType())
def swithing_to_Moscow_time(value: str):
    value = int(value[:2]) - 1
    return str(value) + ':00'

def rename_columns(name_col):
    name_col = name_col.replace("_", " ")
    return name_col.capitalize()

Purchase = quinn.with_columns_renamed(rename_columns)(Purchase)
Purchase = Purchase.withColumn('Moscow time', swithing_to_Moscow_time(col('time')))

Purchase = Purchase.toPandas()
Purchase

Unnamed: 0,Time,Number of purchases,Location and number of visits,Moscow time
0,09:00,4,"{'Australia':5,'United States':6,'Macedonia':2}",8:00
1,10:00,2,"{'United States':11,'Peru':1,'Belgium':3}",9:00
2,11:00,4,"{'China':4,'Turkey':10,'Thailand':3}",10:00
3,12:00,3,"{'Japan':11,'China':2,'Sweden':1}",11:00
4,13:00,7,"{'Macedonia':9,'United States':5,'Belgium':5]",12:00
5,14:00,6,"{'Albania':8,'Japan':3,'United States':12}",13:00
6,15:00,7,"{'China':9,'Hungary':2,'Belarus':11}",14:00
7,16:00,7,"{'Bolivia':10,'Japan':3,'United States':12}",15:00
