In [8]:
# TASK 1
# Big Data Query & Analysis using Spark SQL [30 marks]


from pyspark.sql import SparkSession

# create a SparkSession object
spark = SparkSession.builder.appName("my_app").getOrCreate()

# load your data into a DataFrame
df = spark.read.format("csv").option("header", True).load("data.csv")

# create a temporary view of the DataFrame
df.createOrReplaceTempView("my_table")

# run a SQL query against the view
sql = "SELECT `Dst Port`, (COUNT(CASE WHEN `FIN Flag Cnt` >= 2 THEN 1 END) / COUNT(*) * 100) AS FIN_Flag_Percentage FROM my_table GROUP BY `Dst Port` ORDER BY FIN_Flag_Percentage DESC"
result = spark.sql(sql)

# display the result
result.show()




+--------+-------------------+
|Dst Port|FIN_Flag_Percentage|
+--------+-------------------+
|   57868|                0.0|
|   35640|                0.0|
|   51946|                0.0|
|   37796|                0.0|
|   43044|                0.0|
|   34622|                0.0|
|   57048|                0.0|
|   54580|                0.0|
|   52996|                0.0|
|   37806|                0.0|
|   40090|                0.0|
|   35792|                0.0|
|   46664|                0.0|
|   60042|                0.0|
|   37128|                0.0|
|   44656|                0.0|
|   49978|                0.0|
|   50214|                0.0|
|   53624|                0.0|
|   56400|                0.0|
+--------+-------------------+
only showing top 20 rows



                                                                                

In [9]:
from pyspark.sql import SparkSession

# create a SparkSession object
spark = SparkSession.builder.appName("my_app").getOrCreate()

# load your data into a DataFrame
df = spark.read.format("csv").option("header", True).load("data.csv")

# create a temporary view of the DataFrame
df.createOrReplaceTempView("my_table")

# run a SQL query against the view
sql = "SELECT Protocol, AVG(`Flow Duration`) AS Avg_Flow_Duration FROM my_table GROUP BY Protocol ORDER BY Avg_Flow_Duration DESC LIMIT 5"
result = spark.sql(sql)

# display the result
result.show()



+--------+--------------------+
|Protocol|   Avg_Flow_Duration|
+--------+--------------------+
|       6|   9549803.667116841|
|      17|  376613.15135208116|
|       0|-1.21059038651405...|
+--------+--------------------+



                                                                                

In [27]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Define categorical columns
cat_cols = ["Protocol"]
data = spark.read.format("csv").option("header", True).load("data.csv")

# Define StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in cat_cols]

# Define VectorAssembler to combine all features into a single vector column
assembler = VectorAssembler(inputCols=[col+"_index" for col in cat_cols], outputCol="features")

# Fit the StringIndexer and VectorAssembler on the DataFrame
indexer_model = Pipeline(stages=indexers)
data_indexed = indexer_model.fit(data).transform(data)
data_assembled = assembler.transform(data_indexed)
data_assembled = data_assembled.withColumn("Flow Duration", col("Flow Duration").cast("double"))

# Perform ChiSquareTest
chisqTest = ChiSquareTest.test(data_assembled, "features", "Flow Duration")

# Print p-values for each feature
print(chisqTest.pValues)


                                                                                

Column<'pValues'>
