# Problem 1 SparkSQL (Transaction Data Processing)
Use the Transaction (T) dataset provided (TransID, CustID, TransTotal, TransNumItems) and create a Spark workflow to do the following. [Use SparkSQL & DataFrames to write this workflow.]

Start with T as a dataframe.
1)	T1: Filter out (drop) the transactions from T whose total amount is less than $200

2)	T2: Over T1, group the transactions by the Number of Items it has, and for each group calculate the sum of total amounts, the average of total amounts, the min and the max of the total amounts.

3)	Report back T2 to the client side

4)	T3: Over T1, group the transactions by customer ID, and for each group report the customer ID, and the 
transactions’ count.

5)	T4: Filter out (drop) the transactions from T whose total amount is less than $600

6)	T5: Over T4, group the transactions by customer ID, and for each group report the customer ID, and the transactions’ count.

7)	T6: Select the customer IDs whose T5.count * 5 < T3.count

8)	Report back T6 to the client side

In [6]:
# Part 1


### SQL 

import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

def main():
    spark = SparkSession.builder \
        .appName("TransactionAnalysisSparkSQL") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    # Define schema
    schema = StructType([
        StructField("TransID", IntegerType(), True),
        StructField("CustID", IntegerType(), True),
        StructField("TransTotal", FloatType(), True),
        StructField("TransNumItems", IntegerType(), True)
    ])

    # Load the transactions dataset
    transaction_df = spark.read.csv("Transactions.csv", schema=schema, sep=",")
    transaction_df.createOrReplaceTempView("transactions")

    # T1: Filter transactions with total amount >= $200
    spark.sql("""
        SELECT * FROM transactions
        WHERE TransTotal >= 200
    """).createOrReplaceTempView("t1")

    # T2: Group by TransNumItems and calculate aggregates
    spark.sql("""
        SELECT TransNumItems,
               SUM(TransTotal) AS TotalSum,
               AVG(TransTotal) AS TotalAvg,
               MIN(TransTotal) AS TotalMin,
               MAX(TransTotal) AS TotalMax
        FROM t1
        GROUP BY TransNumItems
    """).show()

    # T3: Group by CustID and count transactions, then report
    t3_query = spark.sql("""
        SELECT CustID, COUNT(*) AS TransCount
        FROM t1
        GROUP BY CustID
    """)
    t3_query.createOrReplaceTempView("t3")
    t3_query.show()  # Report T3

    # T4: Filter transactions with total amount >= $600
    spark.sql("""
        SELECT * FROM transactions
        WHERE TransTotal >= 600
    """).createOrReplaceTempView("t4")

    # T5: Group by CustID and count transactions, then report
    t5_query = spark.sql("""
        SELECT CustID, COUNT(*) AS TransCount
        FROM t4
        GROUP BY CustID
    """)
    t5_query.createOrReplaceTempView("t5")
    t5_query.show()  # Report T5

    # T6: Select customer IDs where T5 count * 5 < T3 count
    spark.sql("""
        SELECT t3.CustID
        FROM t3 JOIN t5 ON t3.CustID = t5.CustID
        WHERE t5.TransCount * 5 < t3.TransCount
    """).show()

    spark.stop()

if __name__ == "__main__":
    main()

    

24/03/28 12:45:35 WARN Utils: Your hostname, Bashirs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.201 instead (on interface en0)
24/03/28 12:45:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 12:45:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------+--------------------+-----------------+--------+--------+
|TransNumItems|            TotalSum|         TotalAvg|TotalMin|TotalMax|
+-------------+--------------------+-----------------+--------+--------+
|            1|5.0861289479537964E7|601.5456881590752|  200.01|  999.99|
|            6| 5.073010268737793E7| 599.221623994542|   200.0|  999.95|
|            3| 5.102334812852478E7|600.3806333885366|  200.01|  999.99|
|            5| 5.080746092744446E7|600.3764910009271|   200.0|  999.97|
|            9|5.0801653669906616E7|600.0738689319105|   200.0|  999.99|
|            4| 5.075956238415527E7|599.6262626300061|  200.01|  999.99|
|            8|5.0691202193374634E7|600.0805241065254|  200.01|  999.97|
|            7| 5.106912347918701E7|601.2517775222752|   200.0|  999.97|
|           10| 5.099957808786011E7|599.0858354715797|  200.01|  999.97|
|            2| 5.065183534359741E7|599.0400963112461|   200.0|  999.98|
+-------------+--------------------+---------------

                                                                                

+------+----------+
|CustID|TransCount|
+------+----------+
| 24663|        20|
| 39432|        16|
| 39285|         9|
| 32592|        13|
| 21220|        26|
| 29894|        18|
|  8389|        13|
|   148|        18|
| 16861|         7|
|  7253|        17|
| 48398|        20|
| 22346|        17|
| 36224|        20|
| 29814|        11|
|  2366|        15|
| 16339|        26|
| 32855|        19|
| 28088|        18|
| 18800|        20|
| 33412|        16|
+------+----------+
only showing top 20 rows



                                                                                

+------+----------+
|CustID|TransCount|
+------+----------+
| 24663|        12|
| 39285|         4|
| 32592|         7|
| 21220|        16|
| 29894|        10|
|   148|        10|
| 16861|         3|
| 48398|        12|
| 22346|         9|
| 36224|        14|
| 29814|         6|
|  2366|         7|
| 32855|        10|
| 28088|        10|
| 18800|         8|
| 33412|         7|
|  7240|         9|
| 31983|        10|
| 18979|         7|
| 47501|        12|
+------+----------+
only showing top 20 rows



                                                                                

+------+
|CustID|
+------+
|  8389|
| 29950|
| 32525|
| 26090|
|   597|
| 22414|
| 11450|
|  1226|
| 10015|
| 23038|
| 35635|
|  8967|
| 33650|
| 46677|
|  6852|
|  3260|
| 36643|
| 33919|
| 31451|
| 20793|
+------+
only showing top 20 rows

