<a href="https://colab.research.google.com/github/d7lewandowski/pyspark-script/blob/main/dataalgoritmswithpyspark/pyspark_basic_func.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=56154bc9b72e001ab6e38e535861365626cbbd6dbbc010a6802ccb4d93bec872
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

In [None]:
# RDD example

In [None]:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()

In [None]:
tuples = [('A', 7), ('A', 8), ('A', -4),
 ('B', 3), ('B', 9), ('B', -1),
 ('C', 1), ('C', 5)]

print(tuples)

[('A', 7), ('A', 8), ('A', -4), ('B', 3), ('B', 9), ('B', -1), ('C', 1), ('C', 5)]


In [None]:
rdd = spark.sparkContext.parallelize(tuples)

rdd.collect()

[('A', 7),
 ('A', 8),
 ('A', -4),
 ('B', 3),
 ('B', 9),
 ('B', -1),
 ('C', 1),
 ('C', 5)]

In [None]:
# drop negative values
positives = rdd.filter(lambda x: x[1] > 0)
positives.collect()

[('A', 7), ('A', 8), ('B', 3), ('B', 9), ('C', 1), ('C', 5)]

In [None]:
# find sum and avg per key using groupBy()
sum_and_avg = positives.groupByKey().mapValues(lambda v: (sum(v), float(sum(v)) / len(v)))

In [None]:
# print result sum_and_avg
sum_and_avg.collect()

[('A', (15, 7.5)), ('B', (12, 6.0)), ('C', (6, 3.0))]

In [None]:
# find sum and avg per key using groupByKey()
sum_count  = positives.mapValues(lambda v: (v, 1))

sum_count.collect()

[('A', (7, 1)),
 ('A', (8, 1)),
 ('B', (3, 1)),
 ('B', (9, 1)),
 ('C', (1, 1)),
 ('C', (5, 1))]

In [None]:
# aggregate (sum, count) per key

sum_count_agg = sum_count.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
sum_count_agg.collect()

[('A', (15, 2)), ('B', (12, 2)), ('C', (6, 2))]

In [None]:
# finalize sum and avg per key
sum_and_avg = sum_count_agg.mapValues(lambda v: (v[0], float(v[0] / v[1])))

sum_and_avg.collect()

[('A', (15, 7.5)), ('B', (12, 6.0)), ('C', (6, 3.0))]

In [None]:
sum_and_avg.take(3)

[('A', (15, 7.5)), ('B', (12, 6.0)), ('C', (6, 3.0))]

In [None]:
sum_and_avg.takeSample(False, 1)

[('B', (12, 6.0))]

In [None]:
# Dataframe example:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum

In [None]:
# Create dataframe using SparkSession
spark = SparkSession.builder.appName('demo').getOrCreate()

In [None]:
dept_emps = [("Sales", "Barb", 40), ("Sales", "Dan", 20),
 ("IT", "Alex", 22), ("IT", "Jane", 24),
 ("HR", "Alex", 20), ("HR", "Mary", 30)]
df = spark.createDataFrame(dept_emps, ['dept', 'name', 'hours'])

In [None]:
df.show()

+-----+----+-----+
| dept|name|hours|
+-----+----+-----+
|Sales|Barb|   40|
|Sales| Dan|   20|
|   IT|Alex|   22|
|   IT|Jane|   24|
|   HR|Alex|   20|
|   HR|Mary|   30|
+-----+----+-----+



In [None]:
# Group the same depts together, aggregate their hours, and compute an avgrage
averages = df.groupBy('dept').agg(avg('hours').alias('avg'), sum('hours').alias('total'))

In [None]:
# Show the results of the final exection
averages.show()

+-----+----+-----+
| dept| avg|total|
+-----+----+-----+
|Sales|30.0|   60|
|   HR|25.0|   50|
|   IT|23.0|   46|
+-----+----+-----+



In [None]:
# Creating an RDD from a Collection
data = [
 ("fox", 6), ("dog", 5), ("fox", 3), ("dog", 8),
 ("cat", 1), ("cat", 2), ("cat", 3), ("cat", 4)
]
print(data)

spark = SparkSession.builder.master('local').appName('app-name').config('', '').getOrCreate()
sc = spark.sparkContext

[('fox', 6), ('dog', 5), ('fox', 3), ('dog', 8), ('cat', 1), ('cat', 2), ('cat', 3), ('cat', 4)]


In [None]:
rdd = sc.parallelize(data)
rdd.collect()

[('fox', 6),
 ('dog', 5),
 ('fox', 3),
 ('dog', 8),
 ('cat', 1),
 ('cat', 2),
 ('cat', 3),
 ('cat', 4)]

In [None]:
rdd.count()

8

In [None]:
sum_per_key = rdd.reduceByKey(lambda x, y: x + y)

sum_per_key.collect()

[('fox', 9), ('dog', 13), ('cat', 10)]

In [None]:
sum_per_key = rdd.reduceByKey(lambda x, y: x + y)

sum_per_key.collect()

[('fox', 9), ('dog', 13), ('cat', 10)]

In [None]:
sum_filtered = sum_per_key.filter(lambda x: x[1] > 9)

In [None]:
sum_filtered.collect()

[('dog', 13), ('cat', 10)]

In [None]:
grouped = rdd.groupByKey()

In [None]:
grouped.collect()

[('fox', <pyspark.resultiterable.ResultIterable at 0x7a05a474fbe0>),
 ('dog', <pyspark.resultiterable.ResultIterable at 0x7a05a474e410>),
 ('cat', <pyspark.resultiterable.ResultIterable at 0x7a05a474f310>)]

SyntaxError: invalid syntax (<ipython-input-144-062b053f1f82>, line 1)