Instalacja wymaganych pakietów

In [1]:
! pip install pyspark==3.0.1 py4j==0.10.9



Tworzenie Spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()
# gdzie "*" znaczy wszystkie rdzenie procesora.

Czytanie danych

In [3]:
# Czytanie CSV plika
csv_file = 'IHME-GBD_2019_DATA-15798851-2.csv'
df = spark.read.csv(csv_file)

Strukturyzacja danych za pomocą schematu Spark

In [4]:
data = spark.read.csv(
      "IHME-GBD_2019_DATA-15798851-2.csv",
      sep=',',
      header=True,
      )
data.printSchema()

root
 |-- measure: string (nullable = true)
 |-- location: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- cause: string (nullable = true)
 |-- metric: string (nullable = true)
 |-- year: string (nullable = true)
 |-- val: string (nullable = true)
 |-- upper: string (nullable = true)
 |-- lower: string (nullable = true)



Manualna strukturyzacja danych

In [5]:
from pyspark.sql.types import *

data_schema = [
    StructField('measure', StringType(), True), #czy dopuszczalna jest wartość null
    StructField('location', StringType(), True),
    StructField('year', IntegerType(), False),
    StructField('var', DoubleType(), False),
]

final_struc = StructType(fields = data_schema)
data2 = spark.read.csv(
      "IHME-GBD_2019_DATA-15798851-2.csv",
      sep=',',
      header=True,
      schema=final_struc
)

data2.printSchema()

root
 |-- measure: string (nullable = true)
 |-- location: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- var: double (nullable = true)



Kontrola danych

In [6]:
data2.schema

StructType(List(StructField(measure,StringType,true),StructField(location,StringType,true),StructField(year,IntegerType,true),StructField(var,DoubleType,true)))

In [7]:
data2.dtypes

[('measure', 'string'),
 ('location', 'string'),
 ('year', 'int'),
 ('var', 'double')]

In [8]:
data2.head

<bound method DataFrame.head of DataFrame[measure: string, location: string, year: int, var: double]>

Manipulacja kolumnami

In [9]:
data = data.withColumn('copy_location', data.location)
data.show(5)

+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+-------------+
|             measure|location|   sex|     age|               cause|metric|year|               val|             upper|             lower|copy_location|
+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+-------------+
|DALYs (Disability...|  Gambia|Female|All Ages|Maternal and neon...|  Rate|2012| 7475.212699705153| 9104.773540846287| 6157.428602624385|       Gambia|
|DALYs (Disability...|  Gambia|  Both|All Ages|Maternal and neon...|  Rate|2012| 7814.344518002015| 9667.960848348446| 6289.146374740097|       Gambia|
|DALYs (Disability...|  Gambia|  Male|All Ages|Substance use dis...|Number|2012| 1659.038707247863| 2126.829520886102|1239.1726985245457|       Gambia|
|DALYs (Disability...|  Gambia|Female|All Ages|Substance use dis...|Number|2012| 874.432

In [10]:
data = data.withColumnRenamed('copy_location', 'copy_location2') #zmiana nazwy
data.show(5)

+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+--------------+
|             measure|location|   sex|     age|               cause|metric|year|               val|             upper|             lower|copy_location2|
+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+--------------+
|DALYs (Disability...|  Gambia|Female|All Ages|Maternal and neon...|  Rate|2012| 7475.212699705153| 9104.773540846287| 6157.428602624385|        Gambia|
|DALYs (Disability...|  Gambia|  Both|All Ages|Maternal and neon...|  Rate|2012| 7814.344518002015| 9667.960848348446| 6289.146374740097|        Gambia|
|DALYs (Disability...|  Gambia|  Male|All Ages|Substance use dis...|Number|2012| 1659.038707247863| 2126.829520886102|1239.1726985245457|        Gambia|
|DALYs (Disability...|  Gambia|Female|All Ages|Substance use dis...|Number|2012| 8

In [11]:
data = data.drop('copy_location2') #kasacja
data.show(5)

+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|             measure|location|   sex|     age|               cause|metric|year|               val|             upper|             lower|
+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|DALYs (Disability...|  Gambia|Female|All Ages|Maternal and neon...|  Rate|2012| 7475.212699705153| 9104.773540846287| 6157.428602624385|
|DALYs (Disability...|  Gambia|  Both|All Ages|Maternal and neon...|  Rate|2012| 7814.344518002015| 9667.960848348446| 6289.146374740097|
|DALYs (Disability...|  Gambia|  Male|All Ages|Substance use dis...|Number|2012| 1659.038707247863| 2126.829520886102|1239.1726985245457|
|DALYs (Disability...|  Gambia|Female|All Ages|Substance use dis...|Number|2012| 874.4324658085982|1186.5605963880798| 618.2717801609034|
|DALYs (Disability...|  Gambia|  B

Radzenie sobie z brakującymi wartościami

In [12]:
data.show

<bound method DataFrame.show of DataFrame[measure: string, location: string, sex: string, age: string, cause: string, metric: string, year: string, val: string, upper: string, lower: string]>

In [13]:
from pyspark.sql import functions as f
# Usuń wiersze z brakującymi wartościami w dowolnej z kolumn
data.na.drop()
# Zastąp brakujące wartości za pomocą średniej
data.na.fill(data.select(f.mean(data['val'])).collect()[0][0])
# Zastąp brakujące wartości nowymi
#data.na.replace(old_value, new_vallue)

DataFrame[measure: string, location: string, sex: string, age: string, cause: string, metric: string, year: string, val: string, upper: string, lower: string]

Pobieranie danych

In [14]:
data.select('year').show(5)

+----+
|year|
+----+
|2012|
|2012|
|2012|
|2012|
|2012|
+----+
only showing top 5 rows



In [15]:
# wybór kilku kolumn
data.select(['location', 'year', 'val']).show(10)

+--------+----+--------------------+
|location|year|                 val|
+--------+----+--------------------+
|  Gambia|2012|   7475.212699705153|
|  Gambia|2012|   7814.344518002015|
|  Gambia|2012|   1659.038707247863|
|  Gambia|2012|   874.4324658085982|
|  Gambia|2012|  2533.4711730564563|
|  Gambia|2012|0.003798563072089447|
|  Gambia|2012|0.002202396944719217|
|  Gambia|2012|0.003038293155919...|
|  Gambia|2012|  179.49365989601577|
|  Gambia|2012|   91.40054305937956|
+--------+----+--------------------+
only showing top 10 rows



Filter

In [16]:
from pyspark.sql.functions import col
data.filter( (col('val') >= 1000) & (col('upper') <= 10000000) )
data.show(5)

+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|             measure|location|   sex|     age|               cause|metric|year|               val|             upper|             lower|
+--------------------+--------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|DALYs (Disability...|  Gambia|Female|All Ages|Maternal and neon...|  Rate|2012| 7475.212699705153| 9104.773540846287| 6157.428602624385|
|DALYs (Disability...|  Gambia|  Both|All Ages|Maternal and neon...|  Rate|2012| 7814.344518002015| 9667.960848348446| 6289.146374740097|
|DALYs (Disability...|  Gambia|  Male|All Ages|Substance use dis...|Number|2012| 1659.038707247863| 2126.829520886102|1239.1726985245457|
|DALYs (Disability...|  Gambia|Female|All Ages|Substance use dis...|Number|2012| 874.4324658085982|1186.5605963880798| 618.2717801609034|
|DALYs (Disability...|  Gambia|  B

Between

In [17]:
data.filter(data.val.between(1000000, 5000000)).show()

+--------------------+------------------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|             measure|          location|   sex|     age|               cause|metric|year|               val|             upper|             lower|
+--------------------+------------------+------+--------+--------------------+------+----+------------------+------------------+------------------+
|DALYs (Disability...|Russian Federation|  Male|All Ages|  Transport injuries|Number|2013|1563818.2525840718|1730665.1380830628|1422922.2224031396|
|DALYs (Disability...|Russian Federation|  Both|All Ages|  Transport injuries|Number|2013|2348269.3054279066|2665389.1249823105| 2068902.188470523|
|DALYs (Disability...|          Thailand|  Both|All Ages|Diabetes and kidn...|Number|2013|1215245.6178658458| 1387336.100516322|1055619.0224713387|
|DALYs (Disability...|        Mozambique|  Male|All Ages|HIV/AIDS and sexu...|Number|2011|2174300.0441490347|274

When

In [18]:
data.select('year', 'val',
f.when(data.year == '2012', 1).otherwise(0)
).show(25)

+----+--------------------+-----------------------------------------+
|year|                 val|CASE WHEN (year = 2012) THEN 1 ELSE 0 END|
+----+--------------------+-----------------------------------------+
|2012|   7475.212699705153|                                        1|
|2012|   7814.344518002015|                                        1|
|2012|   1659.038707247863|                                        1|
|2012|   874.4324658085982|                                        1|
|2012|  2533.4711730564563|                                        1|
|2012|0.003798563072089447|                                        1|
|2012|0.002202396944719217|                                        1|
|2012|0.003038293155919...|                                        1|
|2012|  179.49365989601577|                                        1|
|2012|   91.40054305937956|                                        1|
|2012|   134.6880361780433|                                        1|
|2012|   8646.107047

Like

In [19]:
data.select(
'val',
data.val.rlike('^[9,7]').alias('iso_urrency zaczyba sie na 9 lub␣7')).distinct().show()

+--------------------+----------------------------------+
|                 val|iso_urrency zaczyba sie na 9 lub␣7|
+--------------------+----------------------------------+
|0.007332988142041626|                             false|
|  0.2713034153257434|                             false|
| 0.06367565466684018|                             false|
| 0.19342156900472102|                             false|
|0.003835061317120...|                             false|
|0.032178074330139694|                             false|
|   7093.222503810944|                              true|
|   7502.956806363028|                              true|
| 0.08536612762995582|                             false|
|  260706.33550143513|                             false|
|  1024.1460052312696|                             false|
|  1336.6902679035634|                             false|
|  227114.41419074405|                             false|
|  1332.9571704727955|                             false|
|  22580246.36

GroupBy

In [20]:
data.groupBy('year').count().show()

+----+-----+
|year|count|
+----+-----+
|2016|40392|
|2012|24781|
|2019|40392|
|2017|40392|
|2014|40392|
|2013|39483|
|2018|40392|
|2011| 1224|
|2015|40392|
+----+-----+



Agregacja

In [21]:


from pyspark.sql import functions as f

data.groupBy("year").agg(f.mean("val")).show()
#grupowanie i obliczanie wartości dla grup

+----+------------------+
|year|          avg(val)|
+----+------------------+
|2016|125761.03225030862|
|2012|121301.00826382442|
|2019|126113.55401705152|
|2017|125589.93283513733|
|2014|125758.36794698932|
|2013|127595.00698396392|
|2018|125696.43818551343|
|2011|128072.01051363212|
|2015|125897.23935898131|
+----+------------------+



Wizualizacja danych

In [22]:
from pyspark.sql.functions import col, min, max

df = data.select('year', 'val')\
      .groupBy("year")\
      .agg(min("val").alias("val_min"), 
           max("val").alias("val_max"))\
      .toPandas()
df.head(10)

Unnamed: 0,year,val_min,val_max
0,2016,0.0001017692186211,999610.4490388336
1,2012,0.0001174495225255,9994.58618079852
2,2019,0.000101054875494,9997.116815557363
3,2017,0.0001066226374639,99996.76636858632
4,2014,0.0001086489366267,9997.115100476538
5,2013,0.0001007108344006,99999.98918410508
6,2018,0.0001048873406931,9996.475911845677
7,2011,0.0008754420500049,997.923977543644
8,2015,0.0001029286308104,99994.52912391476


Zapisywanie danych do pliku

In [23]:
# error gdy plik już istnieje
data.write.csv('dataset.csv')
data.write.csv('dataset.json', format='json')
data.write.csv('dataset.parquet', format='parquet')
# wybrane kolumny
data.select(['location_name', 'the_total_mean']).write.csv('dataset.csv')

AnalysisException: path file:/C:/Users/Mikołaj/Desktop/Jupiter/dataset.csv already exists.;