<a href="https://colab.research.google.com/github/JacekPardyak/bn/blob/main/spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=f6308d6c7d97140512f06d0b37cb34de78218523dba41ea377cbb9204e195a10
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

In [6]:
spark

In [7]:
df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)
df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+



In [16]:
from pyspark.sql.functions import col, when, avg

df = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)
df.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [17]:
df.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+



In [18]:
df.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [20]:
df.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



In [21]:
df.groupBy("life_stage").agg(avg("age").alias("Average_age")).show()

+----------+-----------+
|life_stage|Average_age|
+----------+-----------+
|     adult|       53.5|
|     child|        3.0|
|  teenager|       13.0|
+----------+-----------+



In [22]:
spark.sql("select avg(age) from {tbl}", tbl = df).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+



In [23]:
spark.sql("select life_stage, avg(age) from {tbl} group by life_stage", tbl = df).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+



In [24]:
df.write.saveAsTable("some_people")

In [25]:
spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       bob| 75|     adult|
|       heo| 13|  teenager|
|       sue| 32|     adult|
|        li|  3|     child|
+----------+---+----------+



In [26]:
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

DataFrame[]

In [27]:
spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       bob| 75|     adult|
|       heo| 13|  teenager|
|     frank|  4|     child|
|       sue| 32|     adult|
|        li|  3|     child|
+----------+---+----------+



In [28]:
spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+



Kafka example wont work here, but is here https://spark.apache.org/examples.html

In [33]:
my_string = """these are words
these are more words
words in english
"""
# Open a file in write mode
with open("some_words.txt", "w") as file:
    # Write the string to the file
    file.write(my_string)

In [35]:
text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)
counts.collect()

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

In [37]:
my_string = """def greet(name):
    print(f"Hello, {name}!")

if __name__ == "__main__":
    # This block will only execute if the script is run directly
    greet("World")
    """
# Open a file in write mode
with open("greet.py", "w") as file:
    # Write the string to the file
    file.write(my_string)

In [38]:
!python greet.py

Hello, World!


In [39]:
import greet
greet.greet("Alice")

Hello, Alice!


# KMeans

In [46]:
sample_kmeans_data = """0 1:0.0 2:0.0 3:0.0
1 1:0.1 2:0.1 3:0.1
2 1:0.2 2:0.2 3:0.2
3 1:9.0 2:9.0 3:9.0
4 1:9.1 2:9.1 3:9.1
5 1:9.2 2:9.2 3:9.2
    """
# Open a file in write mode
with open("sample_kmeans_data.txt", "w") as file:
    # Write the string to the file
    file.write(sample_kmeans_data)

In [47]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [50]:
dataset = spark.read.format("libsvm").load("sample_kmeans_data.txt")
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [51]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
  print(center)

Silhouette with squared euclidean distance = 0.9997530305375207
Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


for more https://github.com/apache/spark/tree/master/examples/src/main/python/ml