In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=66a710fcdad108b61514d064f8495882a8967f7f1a82903771fb664f26709caa
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
if __name__ == "__main__":
  conf = SparkConf().setAppName("Tushar").setMaster("local[3]")
  sc = SparkContext(conf = conf)

In [19]:
# Sample data for demonstration
data = [
    ("A", 1),
    ("B", 2),
    ("A", 3),
    ("B", 4),
    ("C", 5),
    ("A", 6),
    ("C", 7),
    ("A", 8),
    ("B", 9),
    ("C", 10),
]

In [20]:
# Create an RDD from the sample data
rdd = sc.parallelize(data)

In [21]:
# Transformation 1: groupBy
grouped_data = rdd.groupBy(lambda x: x[0])

In [22]:
# Display the result of groupBy
print("groupBy:")
for key, values in grouped_data.collect():
    print(f"Key: {key}, Values: {list(values)}")

groupBy:
Key: B, Values: [('B', 2), ('B', 4), ('B', 9)]
Key: C, Values: [('C', 5), ('C', 7), ('C', 10)]
Key: A, Values: [('A', 1), ('A', 3), ('A', 6), ('A', 8)]


In [30]:
# Transformation 2: groupByKey
grouped_by_key = rdd.groupByKey()

In [31]:
# Display the result of groupByKey
print("\ngroupByKey:")
for key, values in grouped_by_key.collect():
    print(f"Key: {key}, Values: {list(values)}")


groupByKey:
Key: B, Values: [2, 4, 9]
Key: C, Values: [5, 7, 10]
Key: A, Values: [1, 3, 6, 8]


In [32]:
# Transformation 3: reduceByKey
sum_values = rdd.reduceByKey(lambda x, y: x + y)

In [35]:
sum_values.collect()

[('B', 15), ('C', 22), ('A', 18)]

In [33]:
# Display the result of reduceByKey
print("\nreduceByKey:")
for key, value in sum_values.collect():
    print(f"Key: {key}, Sum: {value}")


reduceByKey:
Key: B, Sum: 15
Key: C, Sum: 22
Key: A, Sum: 18


Imagine you have a large dataset containing social media posts from various users. Each entry includes information about the user, the post content, and the number of likes and shares.

In [36]:
# Load social media data from files into RDD
social_media_data = sc.textFile("social_media_data.txt")

In [39]:
social_media_data.count()

10000

In [41]:
social_media_data.take(3)

['user4\taHjurVmhYPQprrcM1h4i #PySpark #Analytics\t86\t29',
 'user1\tAc2nRoef6LoHl7b6HfJe #MachineLearning\t71\t7',
 'user3\tbnkzmGbjG0FEvYhGGpWx #PySpark #DataScience #MachineLearning\t49\t40']

In [37]:
# Parse each social media entry and extract relevant information
def parse_social_media_entry(entry):
    # Assuming a tab-separated format with user, post content, likes, and shares
    parts = entry.split("\t")
    return (parts[0], parts[1], int(parts[2]), int(parts[3]))

In [38]:
# Apply the parse function to each social media entry
parsed_data = social_media_data.map(parse_social_media_entry)

In [42]:
parsed_data.take(4)

[('user4', 'aHjurVmhYPQprrcM1h4i #PySpark #Analytics', 86, 29),
 ('user1', 'Ac2nRoef6LoHl7b6HfJe #MachineLearning', 71, 7),
 ('user3',
  'bnkzmGbjG0FEvYhGGpWx #PySpark #DataScience #MachineLearning',
  49,
  40),
 ('user3',
  'zow8Shu9Zly2d1qrtXER #DataScience #MachineLearning #PySpark',
  6,
  17)]

In [43]:
# Transformation 1: Calculate the total engagement (likes + shares) for each user
user_engagement = parsed_data.map(lambda x: (x[0], x[2] + x[3]))

In [44]:
user_engagement.take(4)

[('user4', 115), ('user1', 78), ('user3', 89), ('user3', 23)]

In [None]:
total_engagment = user_engagement.reduceByKey(lambda x,y: x+y)
total_engagment.collect()

In [45]:
# Transformation 2: Find the top 5 users with the highest engagement
top_users = user_engagement.takeOrdered(5, key=lambda x: -x[1])

In [49]:
# Action 1: Display the top users
print("Top 5 Users with Highest Engagement:")
for user, engagement in top_users:
    print(f"User: {user}, Engagement: {engagement}")

Top 5 Users with Highest Engagement:
User: user1, Engagement: 150
User: user2, Engagement: 150
User: user1, Engagement: 150
User: user5, Engagement: 150
User: user5, Engagement: 149


In [50]:
# Transformation 3: Extract hashtags from post content
def extract_hashtags(entry):
    # Assuming hashtags start with #
    
    for i in entry[1].split():
        if i.startswith("#"):
            return (i,1)
    # return [(tag, 1) for tag in entry[1].split() if tag.startswith("#")]

In [54]:
# Apply the extract_hashtags function to each social media entry
hashtags = parsed_data.flatMap(extract_hashtags)

In [55]:
hashtags.take(5)

[('#PySpark', 1),
 ('#Analytics', 1),
 ('#MachineLearning', 1),
 ('#PySpark', 1),
 ('#DataScience', 1)]

In [56]:
hashtag_counts = hashtags.reduceByKey(lambda x, y: x + y)

In [58]:
hashtag_counts.collect()

[('#Analytics', 3901),
 ('#BigData', 4020),
 ('#PySpark', 4032),
 ('#MachineLearning', 3945),
 ('#DataScience', 4009)]

In [59]:
# Transformation 4: Find the top 5 hashtags
top_hashtags = hashtag_counts.takeOrdered(5, key=lambda x: -x[1])

In [60]:
# Action 2: Display the top hashtags
print("\nTop 5 Hashtags:")
for hashtag, count in top_hashtags:
    print(f"Hashtag: {hashtag}, Count: {count}")


Top 5 Hashtags:
Hashtag: #PySpark, Count: 4032
Hashtag: #BigData, Count: 4020
Hashtag: #DataScience, Count: 4009
Hashtag: #MachineLearning, Count: 3945
Hashtag: #Analytics, Count: 3901
