### Higher Order Functions
In PySpark, higher order functions refer to functions that take other functions as arguments, or return functions as values. These functions can be used to simplify data processing tasks and enable more complex data transformations.

###map()

- PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD.

**Note** : DataFrame doesn’t have map() transformation to use with DataFrame hence you need to convert DataFrame to RDD first.

In [0]:
#importing DataTypes and SQL fucntions 
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType,BooleanType,DoubleType,ArrayType
from pyspark.sql.functions import *
from functools import reduce

In [0]:
# create a sample dataframe
df1 = spark.createDataFrame([(1, "apArna S"), (2, "aiswarya satheesh"), (3, "athulya Dev")], ["id", "value"])

# convert the dataframe to an RDD
rdd1 = df1.rdd

In [0]:
# define a function to apply to each row in the RDD
def proper_case_row(row):
    id = row[0]
    value = row[1]
    new_value = value.title()
    return (id, new_value)

In [0]:
# use the map() function to apply the transform_row() function to each row in the RDD
new_rdd = rdd1.map(proper_case_row)

In [0]:
# convert the transformed RDD back to a dataframe
new_df = spark.createDataFrame(new_rdd, ["id", "new_value"])

In [0]:
# join the original dataframe with the transformed dataframe on the "id" column
result_df = df1.join(new_df, on="id")

# show the result dataframe
result_df.show()

+---+-----------------+-----------------+
| id|            value|        new_value|
+---+-----------------+-----------------+
|  1|         apArna S|         Aparna S|
|  2|aiswarya satheesh|Aiswarya Satheesh|
|  3|      athulya Dev|      Athulya Dev|
+---+-----------------+-----------------+



###flatMap()

- flatMap() is a transformation operation in PySpark that is used to flatten an RDD of lists or tuples. It maps each input element to zero or more output elements and flattens the results into a single RDD.
- flatMap() can be used for a variety of tasks, such as splitting a text file into words, expanding lists of values, and more.

In [0]:
# create a sample RDD with nested lists
rdd = sc.parallelize([['#analytics', '#deeplearning', '#artificialintelligence', '#python', '#ai'], 
 ['#analytics', '#datascience', '#deeplearning', '#visualization', '#dataanalysis'],
 ['#datavisualization', '#machinelearning', '#dataanalysis', '#cloudcomputing', '#neuralnetworks'],
 ['#python', '#datascience', '#dataengineering', '#nlp', '#algorithms'],
 ['#statistics', '#machinelearning', '#deeplearning', '#ai', '#bigdata']])



In [0]:
# use flatMap to convert the nested list to a flat list of hashtags
flat_rdd = rdd.flatMap(lambda x: x)

# use map to create a key-value pair of (hashtag, 1) for each hashtag
hashtag_counts = flat_rdd.map(lambda x: (x, 1))

# use reduceByKey to count the number of occurrences of each hashtag
hashtag_occurrences = hashtag_counts.reduceByKey(lambda x, y: x + y)

# convert the RDD to a DataFrame with column names
df = hashtag_occurrences.toDF(["hashtag", "count"])

# show the DataFrame
df.show()

+--------------------+-----+
|             hashtag|count|
+--------------------+-----+
|#artificialintell...|    1|
|       #dataanalysis|    2|
|            #bigdata|    1|
|    #dataengineering|    1|
|             #python|    2|
|        #datascience|    2|
|     #cloudcomputing|    1|
|      #visualization|    1|
|     #neuralnetworks|    1|
|         #statistics|    1|
|       #deeplearning|    3|
|                #nlp|    1|
|          #analytics|    2|
|                 #ai|    2|
|  #datavisualization|    1|
|    #machinelearning|    2|
|         #algorithms|    1|
+--------------------+-----+



###reduce()
- reduce() is an action operation in PySpark that aggregates the elements of an RDD using a specified function. It takes a function that accepts two arguments and returns a single value, and applies that function to the elements of the RDD in a cumulative way, reducing the entire RDD down to a single value.
- The key advantage of reduce() is that it allows you to perform complex aggregations on large datasets in a parallel and distributed manner, making it an essential tool for big data processing.

In [0]:
# Create an RDD containing network packet data
packet_data = sc.parallelize([
    {"src_ip": "192.168.1.1", "dest_ip": "8.8.8.8", "bytes_sent": 1000, "bytes_received": 500},
    {"src_ip": "192.168.1.2", "dest_ip": "8.8.8.8", "bytes_sent": 1500, "bytes_received": 1000},
    {"src_ip": "192.168.1.3", "dest_ip": "8.8.8.8", "bytes_sent": 2000, "bytes_received": 750},
    {"src_ip": "192.168.1.1", "dest_ip": "8.8.8.8", "bytes_sent": 500, "bytes_received": 200},
    {"src_ip": "192.168.1.2", "dest_ip": "8.8.8.8", "bytes_sent": 1000, "bytes_received": 500},
    {"src_ip": "192.168.1.3", "dest_ip": "8.8.8.8", "bytes_sent": 500, "bytes_received": 250},
])

# Define a function to aggregate the bytes sent and received by each IP address
def aggregate_bytes_by_ip(x, y):
    return {"src_ip": x["src_ip"], 
            "bytes_sent": x["bytes_sent"] + y["bytes_sent"], 
            "bytes_received": x["bytes_received"] + y["bytes_received"]}


In [0]:
ip_byte_counts_rdd = packet_data.map(lambda x: (x["src_ip"], x)).reduceByKey(aggregate_bytes_by_ip)
#.map(x[src],x) basically converts the rdd into a tuple, such that the ipaddr is the key and the value has all the attr 
#('192.168.1.1', {'src_ip': '192.168.1.1',   'dest_ip': '8.8.8.8', 'bytes_sent': 1000, 'bytes_received': 500})
ip_byte_counts_rdd.collect()

Out[27]: [('192.168.1.3',
  {'src_ip': '192.168.1.3', 'bytes_sent': 2500, 'bytes_received': 1000}),
 ('192.168.1.2',
  {'src_ip': '192.168.1.2', 'bytes_sent': 2500, 'bytes_received': 1500}),
 ('192.168.1.1',
  {'src_ip': '192.168.1.1', 'bytes_sent': 1500, 'bytes_received': 700})]

In [0]:
# Convert the result to a DataFrame
ip_byte_counts_df = ip_byte_counts_rdd.map(lambda x: (x[0], x[1]["bytes_sent"], x[1]["bytes_received"])) \
                                      .toDF(["src_ip", "bytes_sent", "bytes_received"])
# Show the result as a DataFrame
ip_byte_counts_df.show()


+-----------+----------+--------------+
|     src_ip|bytes_sent|bytes_received|
+-----------+----------+--------------+
|192.168.1.3|      2500|          1000|
|192.168.1.2|      2500|          1500|
|192.168.1.1|      1500|           700|
+-----------+----------+--------------+



In [0]:
# Create a dataframe of employee names and ages
df_names_ages = spark.createDataFrame([
    ("Alice", 25),
    ("Bob", 30),
    ("Charlie", 35),
    ("David", 40)
], ["name", "age"])

# Create a dataframe of employee addresses
df_addresses = spark.createDataFrame([
    ("Alice", "123 Main St"),
    ("Bob", "456 Oak Ave"),
    ("Charlie", "789 Elm St"),
    ("David", "1011 Maple Ave")
], ["name", "address"])

# Create a dataframe of employee salaries
df_salaries = spark.createDataFrame([
    ("Alice", 50000),
    ("Bob", 60000),
    ("Charlie", 70000),
    ("David", 80000)
], ["name", "salary"])

# Create a dataframe of employee departments
df_departments = spark.createDataFrame([
    ("Alice", "Sales"),
    ("Bob", "Marketing"),
    ("Charlie", "Engineering"),
    ("David", "Finance")
], ["name", "department"])

# Show the dataframes
df_names_ages.show()
df_addresses.show()
df_salaries.show()
df_departments.show()


+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

+-------+--------------+
|   name|       address|
+-------+--------------+
|  Alice|   123 Main St|
|    Bob|   456 Oak Ave|
|Charlie|    789 Elm St|
|  David|1011 Maple Ave|
+-------+--------------+

+-------+------+
|   name|salary|
+-------+------+
|  Alice| 50000|
|    Bob| 60000|
|Charlie| 70000|
|  David| 80000|
+-------+------+

+-------+-----------+
|   name| department|
+-------+-----------+
|  Alice|      Sales|
|    Bob|  Marketing|
|Charlie|Engineering|
|  David|    Finance|
+-------+-----------+



In [0]:
# define a function to join two data frames
def join_two(df1, df2,):
    return df1.join(df2, "name")


# join all four data frames using reduce
dfs = [df_names_ages,df_addresses,df_salaries,df_departments]
joined_df = reduce(join_two, dfs)

# show the result
joined_df.show()

+-------+---+--------------+------+-----------+
|   name|age|       address|salary| department|
+-------+---+--------------+------+-----------+
|  Alice| 25|   123 Main St| 50000|      Sales|
|    Bob| 30|   456 Oak Ave| 60000|  Marketing|
|Charlie| 35|    789 Elm St| 70000|Engineering|
|  David| 40|1011 Maple Ave| 80000|    Finance|
+-------+---+--------------+------+-----------+



In [0]:
df_1=spark.read.format('csv').option("header","true").load("dbfs:/FileStore/Jan_1_15.csv")
df_2=spark.read.format('csv').option("header","true").load("dbfs:/FileStore/Jan_16_31.csv")
df_3=spark.read.format('csv').option("header","true").load("dbfs:/FileStore/Feb_1_15.csv")
df_4=spark.read.format('csv').option("header","true").load("dbfs:/FileStore/Feb_16_28.csv")
df_1.show(1)
df_2.show(1)
df_3.show(1)
df_4.show(1)

+---+-------------+---------+----------+
| id|       ip_add|  country|      date|
+---+-------------+---------+----------+
|  1|190.89.215.76|Indonesia|10/14/2022|
+---+-------------+---------+----------+
only showing top 1 row

+---+--------------+-------+--------+
| id|        ip_add|country|    date|
+---+--------------+-------+--------+
|  1|46.136.237.248|  China|5/7/2022|
+---+--------------+-------+--------+
only showing top 1 row

+---+-------------+-------+---------+
| id|       ip_add|country|     date|
+---+-------------+-------+---------+
|  1|29.23.123.248|  China|1/13/2023|
+---+-------------+-------+---------+
only showing top 1 row

+---+--------------+-------+---------+
| id|        ip_add|country|     date|
+---+--------------+-------+---------+
|  1|41.172.148.183|  China|1/13/2023|
+---+--------------+-------+---------+
only showing top 1 row



In [0]:
dfs = [df_1, df_2, df_3, df_4]
result = reduce(lambda left, right: left.union(right), dfs)

result.show()

+---+---------------+--------------+----------+
| id|         ip_add|       country|      date|
+---+---------------+--------------+----------+
|  1|  190.89.215.76|     Indonesia|10/14/2022|
|  2|   28.52.25.228|     Indonesia|  3/6/2022|
|  3| 222.148.60.207|       Ukraine| 7/21/2022|
|  4|  98.240.246.84|     Indonesia|  8/3/2022|
|  5|165.212.126.145|        Russia|10/31/2022|
|  6|177.204.154.141|          Peru|  9/6/2022|
|  7|  80.251.247.56|     Indonesia|  9/4/2022|
|  8| 107.155.20.254|     Indonesia| 11/6/2022|
|  9| 201.189.63.244|         China|  7/9/2022|
| 10|177.238.208.225|   Afghanistan| 2/12/2022|
| 11|202.198.239.143|         China| 10/7/2022|
| 12| 24.117.152.203|     Argentina| 1/11/2023|
| 13|   122.96.53.14|      Honduras| 3/23/2022|
| 14| 168.195.251.82|        Belize|  7/4/2022|
| 15|  59.161.50.131|         China| 9/21/2022|
| 16|   231.220.6.41|Czech Republic| 3/15/2022|
| 17| 240.140.109.54|       Ukraine| 6/16/2022|
| 18|   79.218.69.45|      Bulgaria|  1/

###filter()

- The filter() function in PySpark is a transformation that creates a new RDD by selecting elements from an existing RDD that satisfy a given condition.

In [0]:
appliance_rdd = sc.parallelize([("refrigerator", "In Warranty"), ("dishwasher", "Out of Warranty"),
                                ("microwave", "In Warranty"), ("oven", "Out of Warranty"),
                                ("washer", "Out of Warranty"), ("dryer", "In Warranty"),
                                ("range", "In Warranty"), ("cooktop", "Out of Warranty")])

In [0]:
out_of_warranty_rdd = appliance_rdd.filter(lambda x: x[1] == "Out of Warranty")
out_of_warranty_rdd.collect()

Out[17]: [('dishwasher', 'Out of Warranty'),
 ('oven', 'Out of Warranty'),
 ('washer', 'Out of Warranty'),
 ('cooktop', 'Out of Warranty')]

In [0]:
appliance_df= appliance_rdd.toDF(["Appliance", "Warranty"])
# Filter the DataFrame to only include "Out of Warranty" rows
out_of_warranty_df = appliance_df.filter(col("Warranty") == "Out of Warranty")
out_of_warranty_df.show()

+----------+---------------+
| Appliance|       Warranty|
+----------+---------------+
|dishwasher|Out of Warranty|
|      oven|Out of Warranty|
|    washer|Out of Warranty|
|   cooktop|Out of Warranty|
+----------+---------------+



###transform() 
- In PySpark, the transform() function is used to apply a user-defined function (UDF) to each element of an RDD or DataFrame, and returns a new RDD or DataFrame with the transformed values.
- The ***transform()*** function is similar to the map() function, but allows you to apply more complex transformations that require additional dependencies or external libraries. The function takes a UDF as an argument, which can be defined using either a Python lambda function or a standalone function.

In [0]:

# Create a sample DataFrame with a sentence column
data = [("Subject: enron methanol ; meter : 988291 Subject",),
        ("Subject: ehronline web address change",),
        ("Subject: photoshop , windows , office . cheap . main trending",)]
df = spark.createDataFrame(data, ["sentence"])

# Define a function to count the number of words in a sentence
def word_count(sentence):
    return [len(sentence.split())]

df = df.transform(lambda df: df.select("*", size(split("sentence", " ")).alias("word_count")))

# Display the resulting DataFrame
df.show(truncate=False)


+-------------------------------------------------------------+----------+
|sentence                                                     |word_count|
+-------------------------------------------------------------+----------+
|Subject: enron methanol ; meter : 988291 Subject             |8         |
|Subject: ehronline web address change                        |5         |
|Subject: photoshop , windows , office . cheap . main trending|11        |
+-------------------------------------------------------------+----------+



###exists()

- In PySpark, ***exists()*** is a DataFrame function that checks if a specified column or a set of columns exists in the DataFrame. The function returns a boolean value, True if the specified column(s) exist in the DataFrame, and False otherwise.

In [0]:
schema = StructType([
    StructField("sensor_id", IntegerType(), True),
    StructField("sensor_data", ArrayType(FloatType()), True)
])

# create a list of tuples with sensor id and sensor data
data = [(1, [1.0,1.4,4.5,-2.0,0.0,1.0]), (2, [3.8,2.0,1.9,2.0,0.0,-1.0]), (3, [0.4,1.9,0.0,8.0,3.0,-1.0])]

# create the DataFrame
df = spark.createDataFrame(data, schema)

# print the DataFrame
df.show()

+---------+--------------------+
|sensor_id|         sensor_data|
+---------+--------------------+
|        1|[1.0, 1.4, 4.5, -...|
|        2|[3.8, 2.0, 1.9, 2...|
|        3|[0.4, 1.9, 0.0, 8...|
+---------+--------------------+



In [0]:
df.select("*",(exists("sensor_data", lambda x: x >5).alias("any_negative"))).show(truncate=False)

+---------+-------------------------------+------------+
|sensor_id|sensor_data                    |any_negative|
+---------+-------------------------------+------------+
|1        |[1.0, 1.4, 4.5, -2.0, 0.0, 1.0]|false       |
|2        |[3.8, 2.0, 1.9, 2.0, 0.0, -1.0]|false       |
|3        |[0.4, 1.9, 0.0, 8.0, 3.0, -1.0]|true        |
+---------+-------------------------------+------------+



###aggregate()
- The aggregate() function in PySpark is a transformation operation that applies an aggregation function to the elements of an RDD and returns a result.

In [0]:
df = df.withColumn('avg_sensor_value', aggregate('sensor_data', lit(0.0), lambda acc1, acc2: acc1 + acc2) / size('sensor_data'))

df.show()

+---------+--------------------+------------------+
|sensor_id|         sensor_data|  avg_sensor_value|
+---------+--------------------+------------------+
|        1|[1.0, 1.4, 4.5, -...|0.9833333293596903|
|        2|[3.8, 2.0, 1.9, 2...| 1.449999988079071|
|        3|[0.4, 1.9, 0.0, 8...|2.0499999970197678|
+---------+--------------------+------------------+

