In [5]:
import configparser
import os
import sys
from functools import reduce
import matplotlib.pyplot as plt
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import explode, split, col, when, expr

In [None]:
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), "../.."))
DATA = os.path.join(PROJECT_ROOT, "data", "input.txt")
OUTPUT = os.path.join(PROJECT_ROOT, "data", "output")

if not os.path.exists(DATA):
    raise FileNotFoundError(f"Input file not found: {DATA}")

In [14]:
spark = SparkSession.builder \
    .appName("WordCount") \
    .master("local[*]") \
    .getOrCreate()

print(spark.version)
lines = spark.read.text(DATA)
words = lines.select(explode(split(col("value"), "\\s+")).alias("word"))
word_counts = words.groupBy("word").count()
word_counts.show(truncate=False)
word_counts.write.mode("overwrite").csv(OUTPUT)
spark.stop()

4.0.1
+-----+-----+
|word |count|
+-----+-----+
|you  |1    |
|how  |1    |
|hello|1    |
|Hello|2    |
|spark|1    |
|are  |1    |
|world|1    |
|itmo |1    |
+-----+-----+



In [11]:
print("\n=== Spark Configuration Parameters ===")
for k, v in spark.sparkContext.getConf().getAll():
    print(f"{k}: {v}")
print("=== End Configuration ===\n")


=== Spark Configuration Parameters ===
spark.rdd.compress: True
spark.hadoop.fs.s3a.vectored.read.min.seek.size: 128K
spark.app.id: local-1757970588453
spark.sql.warehouse.dir: file:/Users/ivanzolin/Documents/itmo-magistracy/mle-bigdata/labs/bigdata-lab5/src/notebook/spark-warehouse
spark.executor.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED 