In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, avg, when, lit, udf
from pyspark.sql.types import DoubleType, TimestampType
from math import radians, sin, cos, sqrt, atan2
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType


In [2]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("LocationProcessing") \
    .getOrCreate()


In [3]:

# Define the schema for the data
schema = StructType([
    StructField("LocationIndex", StringType()),
    StructField("DeviceID", StringType()),
    StructField("Latitude", DoubleType()),
    StructField("Longitude", DoubleType()),
    StructField("Timestamp_date", TimestampType()),
    StructField("CompanyName", StringType()),
    StructField("Timestamp", TimestampType()),
    StructField("PinCode", StringType()),
    StructField("City", StringType())
])




In [4]:
# Load the data
raw_data = spark.read \
    .schema(schema) \
    .option("header", "true") \
    .csv("C:/Users/Hp/OneDrive/Desktop/New folder/ss.csv").limit(1000)  # Replace with the actual path

In [5]:
raw_data.show(2)

+---------------+--------------------+----------+----------+-------------------+--------------------+-------------------+-------+-----------+
|  LocationIndex|            DeviceID|  Latitude| Longitude|     Timestamp_date|         CompanyName|          Timestamp|PinCode|       City|
+---------------+--------------------+----------+----------+-------------------+--------------------+-------------------+-------+-----------+
|89608b016c3ffff|e38ed0cf-7ce7-4e7...|   18.9721|  72.82836|2020-06-24 00:00:00| Syscon Infoway Pvt.|2020-06-24 13:02:44| 400070|Mumbai City|
|89608b016c3ffff|928d0c85-7cf0-4c6...|18.9716658|72.8264716|2020-06-12 00:00:00|Microscan Compute...|2020-06-12 14:01:04| 400070|Mumbai City|
+---------------+--------------------+----------+----------+-------------------+--------------------+-------------------+-------+-----------+
only showing top 2 rows



In [6]:
selected_columns = ["LocationIndex", "DeviceID", "Latitude", "Longitude", "Timestamp", "PinCode", "City"]
filtered_data = raw_data.select(*selected_columns)


In [7]:
filtered_data.show()

+---------------+--------------------+----------+----------+-------------------+-------+-----------+
|  LocationIndex|            DeviceID|  Latitude| Longitude|          Timestamp|PinCode|       City|
+---------------+--------------------+----------+----------+-------------------+-------+-----------+
|89608b016c3ffff|e38ed0cf-7ce7-4e7...|   18.9721|  72.82836|2020-06-24 13:02:44| 400070|Mumbai City|
|89608b016c3ffff|928d0c85-7cf0-4c6...|18.9716658|72.8264716|2020-06-12 14:01:04| 400070|Mumbai City|
|89608b016c3ffff|fe1cea84-51c5-480...| 18.971809| 72.826249|2020-06-25 17:47:17| 400074|Mumbai City|
|89608b016c3ffff|998f05d3-77de-436...| 18.970652|72.8274182|2020-06-12 15:00:22| 400070|Mumbai City|
|89608b016c3ffff|fe1cea84-51c5-480...| 18.971809| 72.826249|2020-06-25 17:47:47| 400074|Mumbai City|
|89608b016c3ffff|b33da977-926d-4c0...| 18.971178| 72.827174|2020-06-12 17:46:25| 400070|Mumbai City|
|89608b016c3ffff|fe1cea84-51c5-480...| 18.971809| 72.826249|2020-06-25 17:47:47| 400074|Mum

In [8]:
# Define the UDF to calculate Haversine distance
def haversine(lat1, lon1, lat2, lon2):
    # Convert latitude and longitude from degrees to radians
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    
    # Haversine formula implementation
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    
    # Earth's radius in meters (mean value)
    radius_of_earth = 6371000  # meters
    
    # Calculate the distance in meters
    distance = radius_of_earth * c
    
    return distance

haversine_udf = udf(haversine, DoubleType())


In [9]:
haversine_udf = udf(haversine, DoubleType())

# Define the radius in meters
radius = 200

# Define the time window duration
time_window_duration = "10 minutes"



In [10]:
# Calculate the average number of devices within the radius for each time window
result = filtered_data \
    .withColumn("TimeWindow", window("Timestamp", time_window_duration)) \
    .groupBy("TimeWindow", "Latitude", "Longitude") \
    .agg(avg(when(haversine_udf("Latitude", "Longitude", filtered_data["Latitude"], filtered_data["Longitude"]) <= radius, 1).otherwise(0)).alias("AvgDevicesInRadius"))



In [None]:
# Save the results to a CSV file
result.write \
    .mode("overwrite") \
    .csv("C:/Users/Hp/OneDrive/Desktop/New folder/result.csv")  # Replace with the desired path

# Stop the Spark session
spark.stop()