In [1]:
import os, sys
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['PYSPARK_PYTHON'] =  'python3.9'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3.9'
os.environ['HADOOP_USER_NAME']='ssenigov'

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [2]:
conf = (SparkConf().setAppName('CodeToCluster').setMaster('yarn') 
    .set('spark.sql.adaptive.enabled', 'false'))

spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("app_id".ljust(40), spark.sparkContext.applicationId)

'''
PySpark application consists of two parts: one is driver and the other is run in distributed containers on working nodes.
They are separate processes and we can’t exchange data between them as we would do in a standard python program with variables and data objects.

Driver doesn’t see the processed data until we ASK for it for example with DataFrame.show() or rdd.collect().

After all, Apache Spark as well as MapReduce paradigm slogan is «Move code to data», not data to code.
And Apache Spark does it! At least with objects RDD and DataFrames – their data is distributed over workers in cluster and CODE of their methods as well distributed over workers in cluster.

But how one can have CUSTOM application CODE executed distributed over workers in cluster?

Imagine we’re building an antispam system. Distributed workers receive messages and set SPAM or NOT_SPAM tag. We’ve got a custom python function «get_tag_message(message)».

How can we run this custom python function distributed?

There are 3 simple ways, using functionality of RDD and DataFrame:
Use RDD.map(). With map() we can modify value in RDD.
Use DataFrame.foreach() or DataFrame.foreachPartition(). With foreach() we can NOT modify value in DataFrame, just make any custom operations with it
Use User-Defined Function within DataFrame With udf we can put its return value to a new column.

Look at the examples in the code.

And look! We are not obliged to use data from RDD or Dataframe in custom function. 
This can be just a distributed placeholder for data retrieving or generation and processing, like in a calculation of PI in one of previous posts.
'''

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/30 15:51:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/30 15:51:32 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
24/12/30 15:51:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


app_id                                   application_1727681258360_0065


'\nPySpark application consists of two parts: one is driver and the other is run in distributed containers on working nodes.\nThey are separate processes and we can’t exchange data between them as we would do in a standard python program with variables and data objects.\n\nDriver doesn’t see the processed data until we ASK for it for example with DataFrame.show() or rdd.collect().\n\nAfter all, Apache Spark as well as MapReduce paradigm slogan is «Move code to data», not data to code.\nAnd Apache Spark does it! At least with objects RDD and DataFrames – their data is distributed over workers in cluster and CODE of their methods as well distributed over workers in cluster.\n\nBut how one can have CUSTOM application CODE executed distributed over workers in cluster?\n\nImagine we’re building an antispam system. Distributed workers receive messages and set SPAM or NOT_SPAM tag. We’ve got a custom python function «get_tag_message(message)».\n\nHow can we run this custom python function dis

In [3]:
# return SPAM if 3 or more key words are found
def get_tag_message(message: str): 
    WORDS_TO_FIND = ['discount', 'gain', 'only', 'today']
    find_count = 0
    
    for word in WORDS_TO_FIND:
        find_count += 1 if message.find(word) > -1 else 0

        if find_count >= 3: 
            return 'SPAM'
    return 'NOT_SPAM'

# create a sample dataframe with 60 rows - take this list 10 times
messages = ['only for you discount and gain', 'today for you discount',
            'today discount 10$, profit', 'only today am i here',
            'gain with discount today', 'today only you is at work']

data = [ [k] for k in messages * 10 ] # take this 10 times
df = spark.sparkContext.parallelize(data).toDF(["msg"])

                                                                                

In [4]:
# Use RDD.map(). With map() we can modify value in RDD
from pyspark.sql import Row

def for_each_rdd_row(row):
    message = row.msg # or row[0] or row['msg']
    tag = get_tag_message(message) # call custom function
    return Row(f"{tag:8} | {message}") # set tag as a part of value

df_res = df.rdd.map(lambda r: for_each_rdd_row(r)).toDF(["tagged_msg"])
df_res.groupBy('tagged_msg').count().orderBy(['tagged_msg']).show(truncate=False)



+-----------------------------------------+-----+
|tagged_msg                               |count|
+-----------------------------------------+-----+
|NOT_SPAM | only today am i here          |10   |
|NOT_SPAM | today discount 10$, profit    |10   |
|NOT_SPAM | today for you discount        |10   |
|NOT_SPAM | today only you is at work     |10   |
|SPAM     | gain with discount today      |10   |
|SPAM     | only for you discount and gain|10   |
+-----------------------------------------+-----+



                                                                                

In [5]:
# Use DataFrame.foreach() 
# With foreach() we can NOT modify value in DataFrame, just make any custom operations with it
from pyspark import Accumulator

count_spam = spark.sparkContext.accumulator(0) # use accumulator to get result in driver
count_not_spam = spark.sparkContext.accumulator(0)

def for_each_df_row(row):
    global count_spam, count_not_spam 
    message = row.msg # or row[0] or row['msg']
    tag = get_tag_message(message) # call custom function
    if tag == 'SPAM':
        count_spam +=1       # use accumulator to get result in the driver
    elif tag == 'NOT_SPAM':
        count_not_spam +=1   # use accumulator to get result in the driver 

df.foreach(for_each_df_row) # method doesn't return DF, just executes the function for each row
print(f'Result: NOT_SPAM: {count_not_spam}, SPAM: {count_spam}') 

Result: NOT_SPAM: 40, SPAM: 20


In [6]:
# Use DataFrame.foreachPartition()
# With foreachPartition() we can NOT modify values in DataFrame, just make any custom operations with it
from pyspark import Accumulator

count_spam = spark.sparkContext.accumulator(0) # use accumulator to get result in driver
count_not_spam = spark.sparkContext.accumulator(0)

def for_each_df_partition(rows_iterator): 
    global count_spam, count_not_spam 

    for row in rows_iterator: # rows_iterator yields rows in processed data partition
        message = row.msg # or row[0] or row['msg']
        tag = get_tag_message(message) # call custom function
        if tag == 'SPAM':
            count_spam +=1       # use accumulator to get result in the driver
        elif tag == 'NOT_SPAM':
            count_not_spam +=1   # use accumulator to get result in the driver 

df.foreachPartition(for_each_df_partition) # method doesn't return DF, just executes the function for each row
print(f'Result: NOT_SPAM: {count_not_spam}, SPAM: {count_spam}') 

Result: NOT_SPAM: 40, SPAM: 20


In [7]:
# Use User-Defined Function within DataFrame. With udf we can return its value to a new column.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

udf_get_tag_of_message = udf(lambda r: get_tag_message(r), StringType()) # define udf

df.withColumn('spam_tag', udf_get_tag_of_message(col('msg'))) \
   .groupBy(['spam_tag']).count().orderBy(['spam_tag']).show(truncate=False) # call udf



+--------+-----+
|spam_tag|count|
+--------+-----+
|NOT_SPAM|40   |
|SPAM    |20   |
+--------+-----+



                                                                                

In [9]:
spark.stop()