In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark=SparkSession.\
    builder.\
        appName("RDD").\
            getOrCreate()

In [None]:
order_columns=["order_id","order_date","order_customer_id","order_status"]

In [None]:
orders=spark.read.format("csv").option("inferSchema","true").load("/user/sovik/retail_db/orders")

In [None]:
orders.show()

In [None]:
order_df=orders.toDF(*order_columns)

In [None]:
order_df.show()

In [None]:
order_df_100=order_df.limit(100)

In [None]:
order_df_100.count()

# Assume we have an RDD containing lines of text
lines_rdd = sc.parallelize(["hello world", "foo bar", "baz"])
# We want to split each line by space and then count the characters in each word
word_count_rdd = lines_rdd.flatMap(lambda line: line.split(" ")) \
                          .map(lambda word: (word, len(word)))

# Now, we have an RDD of (word, word_length) pairs
# For example, [('hello', 5), ('world', 5), ('foo', 3), ('bar', 3), ('baz', 3)]

In [None]:
sc=spark.sparkContext

In [None]:
lines_rdd = sc.parallelize(["hello world", "foo bar", "baz"])

In [None]:
words=lines_rdd.flatMap(lambda lines: lines.split(" "))

In [None]:
display(words.collect())

In [None]:
lines_rdd.getNumPartitions()

In [None]:
word_count_rdd=lines_rdd.flatMap(lambda line: line.split(" ")).map(lambda word:(word,len(word)))

In [None]:
word_pairs_rdd = words_rdd.map(lambda word: (word, 1))

In [None]:
display(word_count_rdd.collect())

In [None]:
display(lines_rdd.collect())

In [None]:
word_total_length_rdd = words.reduceByKey(lambda x, y: x + y)
word_lengths_grouped_rdd = words.groupByKey()

In [None]:
word_total_length_rdd.collect()

In [None]:
word_lengths_grouped_rdd.collect()

In [None]:
for word, lengths in word_lengths_grouped_rdd.collect():
    print(word, list(lengths))

In [57]:
# Create an RDD from a list of lines
lines_rdd = sc.parallelize(["hello world bar baz", "foo bar", "baz"])

# Split lines into words
words_rdd = lines_rdd.flatMap(lambda line: line.split(" "))

# Map words to key-value pairs (word, 1)
word_pairs_rdd = words_rdd.map(lambda word: (word, 1))

# Reduce by key to count occurrences of each word
word_counts_rdd = word_pairs_rdd.reduceByKey(lambda x, y: x + y)

# Collect and print the word counts
# for word, count in word_counts_rdd.collect():
#     print(word, count)

# Optional: Group words (though reduceByKey is typically more efficient for counting)
word_lengths_grouped_rdd = word_pairs_rdd.groupByKey()
for word, lengths in word_lengths_grouped_rdd.collect():
    print(word, list(lengths))


hello [1]
world [1]
foo [1]
bar [1, 1]
baz [1, 1]


In [61]:
nz_business=spark.read.format("csv").option("header","true").option("inferschema","true").load("/user/sovik/retail_db/dummy_text.csv")

In [62]:
nz_business.show(truncate=False)

+----+---------------------------+--------------------+--------------------+------------------+-------------+-----------------------------------------------------+---------------------+---------+----------------------------------------------------------------------------------------------------------------+
|Year|Industry_aggregation_NZSIOC|Industry_code_NZSIOC|Industry_name_NZSIOC|Units             |Variable_code|Variable_name                                        |Variable_category    |Value    |Industry_code_ANZSIC06                                                                                          |
+----+---------------------------+--------------------+--------------------+------------------+-------------+-----------------------------------------------------+---------------------+---------+----------------------------------------------------------------------------------------------------------------+
|2021|Level 1                    |99999               |All industries    

In [63]:
nz_business.count()

57

In [64]:
nz_business_rdd=sc.textFile("/user/sovik/retail_db/dummy_text.csv")

In [None]:
reference_data = {"Industry_aggregation_NZSIOC": "value1"}

##Accumulator

In [67]:
accumulator_hello = sc.accumulator(0)
accumulator_foo = sc.accumulator(0)

lines_rdd = sc.parallelize(["hello world", "foo bar", "hello foo", "baz"])

def count_hello_n_foo(line):
    global accumulator_hello,accumulator_foo
    if "hello" in line:
        accumulator_hello += 1
    if "foo" in line:
        accumulator_foo+=1
    

# Apply the function to each line
lines_rdd.foreach(count_hello_n_foo)

# After the action, we can access the value of the accumulator
print(f"Number of lines containing 'hello': {accumulator_hello.value} n foo: {accumulator_foo.value}")

Number of lines containing 'hello': 2 n foo: 2


##BroadCast

In [68]:
lookup_table = {"foo": "FOO", "bar": "BAR", "hello": "HELLO"}
broadcast_var = sc.broadcast(lookup_table)

def replace_words(line):
    words = line.split()
    return " ".join([broadcast_var.value.get(word, word) for word in words])

# Apply the function to each line and collect the results
transformed_lines_rdd = lines_rdd.map(replace_words)
print(transformed_lines_rdd.collect())

['HELLO world', 'FOO BAR', 'HELLO FOO', 'baz']
