<a href="https://colab.research.google.com/github/hanarayan/EPAM_PRACTICE/blob/main/PracticeEPAM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
from pyspark import SparkContext
import csv,json , re
from io import StringIO
# Check if a SparkContext already exists
try:
    sc = SparkContext.getOrCreate()
    print("Using existing SparkContext")
except ValueError:
    # If not, create a new one
    sc = SparkContext("local", "Linkdin Example")
    print("Created a new SparkContext")


In [None]:
rdd  = sc.textFile("/content/LinkedIn people profiles datasets.csv")
rdd_raw = rdd.map(lambda line: line.replace(',"', '\t"'))
print (rdd_raw.take(2))

In [None]:
header = rdd_raw.first()
rdd_no_header = rdd_raw.filter(lambda line: line != header)
print(f"Total rows (Including header): {rdd_raw.count()}")
print(f"Total rows (excluding header): {rdd_no_header.count()}")

In [None]:


header_columns = header.split("\t")
header_columns = [col.strip('""').strip().lower() for col in header_columns]

print("Available Columns:")
for idx, column in enumerate(header_columns):
    print(f"Index: {idx}, Column: {column}")



In [74]:
country_code_index = header_columns.index("country_code")
name_index = header_columns.index("name")
region_index = header_columns.index("region")
company_name_index = header_columns.index("current_company:name")
following_index = header_columns.index("following")
post_index = header_columns.index("posts")
education_index = header_columns.index("education")
certification_index = header_columns.index("certifications")
recommendations_index = header_columns.index("recommendations_count")

In [None]:
#rdd_split = rdd_no_header.map(lambda line: line.split(","))

def parse_csv_line(lines):
    reader = csv.reader(lines, delimiter='\t')  # Change delimiter if needed
    return [row for row in reader]

rdd_split = rdd_raw.filter(lambda row: row != header)
rdd_split = rdd_split.mapPartitions(parse_csv_line)

print (rdd_split.take(1))


In [76]:

def clean_column(code):
    cleaned_code = str(code).strip().strip('"')
    if not cleaned_code or cleaned_code.lower() in ["null", "no data","--"]:
        return "Not Available"
    return cleaned_code

In [77]:
def clean_digit(digit_value):
    try:
        return int(digit_value) if digit_value.isdigit() else 0
    except ValueError:
        return 0

In [None]:
print(country_code_index)
country_codes_rdd = rdd_split.map(lambda row: clean_column(row[country_code_index]))
distinct_countries = country_codes_rdd.distinct().sortBy(lambda x: x.lower()).collect() #list of string is returned

print("Distinct Country Codes:")
for i,country in enumerate(distinct_countries, start=1):
    print(f"{i}. {country}")

In [None]:
specific_country_code = "CA"
filtered_profiles_rdd = rdd_split.filter(lambda row: clean_column(row[country_code_index]) == specific_country_code)

# Extract names of profiles belonging to the specified country
profile_names_rdd = filtered_profiles_rdd.map(lambda row: clean_column(row[name_index]))

# Sort and collect unique profile names
sorted_profiles = profile_names_rdd.distinct().sortBy(lambda x: x.lower()).collect()

print(f"Profiles from {specific_country_code}:")
for i, profile in enumerate(sorted_profiles, start=1):
    print(f"{i}. {profile}")


In [None]:
regions_rdd  = rdd_split.map(lambda row: (clean_column(row[region_index]), 1))
print(regions_rdd.take(5))
region_counts = regions_rdd.reduceByKey(lambda a, b: a + b)
print(region_counts.take(5))
region_counts_result = region_counts.sortBy(lambda x: x).collect()

print("Region:" )
for i,(region, count) in enumerate(region_counts_result,start =1):
    print(f"{i}. {region}, Count: {count}")


In [None]:
company_names_rdd = rdd_split.map(lambda row: clean_column(row[company_name_index]))
distinct_company_names_rdd = company_names_rdd.distinct().sortBy(lambda x: x.lower()).collect()

print("Distinct company names:")
for i , company in enumerate(distinct_company_names_rdd,start=1):
     print(f"{i}. {company}")

In [None]:
people_rdd = rdd_split.map(lambda row: (
    clean_digit(clean_column(row[following_index])),
    clean_column(row[name_index])
))

print(people_rdd.take(5))
sorted_people_rdd = people_rdd.sortBy(lambda x: x[0], ascending=False)


top_10_followed = sorted_people_rdd.take(10)


print("Top 10 Most-Followed People:")
for i,(following, name) in enumerate(top_10_followed,start =1):
    print(f"{i}. {name}: {following} followers")


In [83]:
def clean_string(raw_str):
    """Cleans raw JSON string by fixing formatting issues."""
    cleaned_str = raw_str.strip()  # Remove leading/trailing spaces
    cleaned_str = cleaned_str.replace("\t", ",")  # Fix delimiters
    cleaned_str = cleaned_str.replace("['", "[")  # Fix opening bracket
    cleaned_str = cleaned_str.replace("']", "]")  # Fix closing bracket
    cleaned_str = cleaned_str.replace('""', 'null')  # Handle empty values correctly
    return cleaned_str

def safe_json_loads(raw_str):
    """Safely loads JSON string, handling decoding errors."""
    try:
        return json.loads(raw_str)
    except json.JSONDecodeError as e:
        print(f"JSON Parse Error: {raw_str} | Error: {e}")  # Debugging output
        return None  # Return None for invalid JSON rows

In [None]:
def extract_profile_likes(row):
    """Extracts profile name, falling back to 'name' column if attribution is missing."""
    post_list = safe_json_loads(clean_string(row[post_index]))

    if post_list:
        extracted_data = []
        for post in post_list:
            profile_name = post.get("attribution", row[name_index]).replace("Liked by ", "") if post.get("attribution") else row[name_index]
            extracted_data.append((profile_name, 1))
        return extracted_data
    return []

profile_likes_rdd = rdd_split.flatMap(extract_profile_likes)

likes_count_rdd = profile_likes_rdd.reduceByKey(lambda a, b: a + b)

all_profiles_rdd = rdd_split.map(lambda row: row[name_index]).distinct()

likes_count_dict = dict(likes_count_rdd.collect())

final_sorted_likes_count = sorted([(profile, likes_count_dict.get(profile, 0)) for profile in all_profiles_rdd.collect()], key=lambda x: x[0])

print("Total Posts Liked Per Profile (Sorted by Name, Including 0 Likes):")
for i,(profile, count) in enumerate(final_sorted_likes_count,start = 1):
    print(f"{i}.{profile}: {count}")

In [None]:
# Extract education institutions from JSON data
def extract_education_profiles(row):
    """Extracts education institutions from JSON or assigns default if missing."""
    education_list = safe_json_loads(clean_string(row[education_index])) if row[education_index] else []

    # Ensure invalid or empty data gets properly mapped to "Institution Not Available"
    if not education_list or not isinstance(education_list, list):
        return [("Institution Not Available", 1)]

    return [(edu.get("title", "Institution Not Available"), 1) for edu in education_list]

# Mapping institutions to profile counts
education_count_rdd = rdd_split.flatMap(extract_education_profiles)


# Extract all unique institutions
all_institutions_rdd = education_count_rdd.map(lambda x: x[0]).distinct()

# Aggregate institution counts
institution_counts_rdd = education_count_rdd.reduceByKey(lambda a, b: a + b)

# Convert counts to a dictionary
institution_count_dict = dict(institution_counts_rdd.collect())

# Directly collect institutions from RDD instead of a separate list
final_sorted_institutions = sorted([
    (inst, institution_count_dict.get(inst, 0))
    for inst in all_institutions_rdd.collect()  # No separate `institutions_list`
], key=lambda x: x[0].lower())

# Print results
print("Total Profiles by Education Institution (Sorted Alphabetically):")
for i, (institution, count) in enumerate(final_sorted_institutions,start = 1):
    print(f"{i}.{institution}: {count}")

In [None]:
# Apply cleaning and JSON transformation
certification_rdd = rdd_split.map(lambda row: safe_json_loads(clean_string(row[certification_index]))).filter(lambda x: x is not None)


def extract_certification_profiles(row):
    """Extracts certification names from 'title' tag within JSON, linked to profiles."""
    certification_list = safe_json_loads(clean_string(row[certification_index])) if row[certification_index] else [] # Convert 'certification' column to JSON

    if certification_list:  # Ensure JSON conversion is valid
        extracted_data = []
        for certification in certification_list:
            cert_name = certification.get("title", "Unknown certifications")  # Get certification name
            extracted_data.append((cert_name, 1))  # Count occurrences
        return extracted_data
    return []  # Return empty list if no valid certification data

# Create RDD mapping certifications to profile counts
certification_count_rdd = rdd_split.flatMap(extract_certification_profiles)

# **Aggregate counts for each certification**
certification_counts_rdd = certification_count_rdd.reduceByKey(lambda a, b: a + b)

# **Extract all unique certifications to ensure missing entries are shown with `0` counts**
all_certifications_rdd = certification_rdd.flatMap(lambda cert_list: [cert.get("title", "Unknown certifications") for cert in cert_list]).distinct()

# Convert counts to a dictionary for lookup
certification_count_dict = dict(certification_counts_rdd.collect())

# **Ensure missing certifications appear with a count of `0`**
final_sorted_certifications = sorted([(cert_name, certification_count_dict.get(cert_name, 0)) for cert_name in all_certifications_rdd.collect()], key=lambda x: x[0].lower())

# **Print final sorted results**
print("Total Profiles by certifications (Sorted Alphabetically):")
for i,(cert_name, count) in enumerate(final_sorted_certifications,start =1):
    print(f"{i}.{cert_name}: {count}")

In [None]:
# Function to parse and handle null values
def extract_recommendations(row):
    """Extracts profile names and their corresponding recommendation counts."""
    name = row[name_index]  # Extract profile name
    recommendations = row[recommendations_index]  # Extract recommendations count

    # Convert recommendations to integer safely, treating null as 0
    recommendations = int(recommendations) if recommendations and recommendations.isdigit() else 0

    return (name, recommendations)

# Create RDD with extracted recommendation counts
recommendations_rdd = rdd_split.map(extract_recommendations).sortByKey()

# **Print final sorted results**
print("Total Recommendations per Profile (Sorted Alphabetically):")
for i,(name, count) in enumerate(recommendations_rdd.collect() ,start =1):
    print(f"{i}.{name}: {count}")