#### Initialize Spark with Delta

In [None]:
from delta import * 
import os 
import pyspark
import pyspark.sql.functions as F

conf = (
        pyspark.conf.SparkConf()
        .setAppName("LinkedIn Processor 3")
        .set(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog"
        )
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .set("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .set("spark.hadoop.fs.s3a.access.key", "minio")
        .set("spark.hadoop.fs.s3a.secret.key", "minio123")
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.sql.shuffle.partitions", "4")

        .setMaster(
            "spark://spark-master:7077"
        )
    )

extra_packages = [    "org.apache.hadoop:hadoop-aws:3.3.4",    "org.apache.hadoop:hadoop-common:3.3.4",    "com.amazonaws:aws-java-sdk-bundle:1.12.262",]
    
builder = pyspark.sql.SparkSession.builder.appName("LinkedInProcessor").config(conf=conf)
spark = configure_spark_with_delta_pip(
    builder, extra_packages=extra_packages
).getOrCreate()


In [None]:
spark

#### Create invitations table

In [3]:
invitations = spark.read.option("header", "true").csv("s3a://raw/linkedin/Invitations.csv")
invitations = spark.read.option("header", "true").csv("s3a://raw/linkedin/Invitations.csv")
invitations = invitations.withColumnRenamed("From", "sender")\
                          .withColumnRenamed("To", "recipient")\
                          .withColumnRenamed("Sent At", "sent_at")\
                          .withColumnRenamed("Message", "message")\
                          .withColumnRenamed("Direction", "direction")\
                          .withColumnRenamed("inviterProfileUrl", "sender_profile_url")\
                          .withColumnRenamed("inviteeProfileUrl", "recipient_profile_url")
  
invitations.write.format("delta").mode("overwrite").save("s3a://processed/linkedin/invitations")


24/09/06 12:58:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/09/06 12:58:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

#### Messages

In [4]:
messages = spark.read.option("header", "true").csv("s3a://raw/linkedin/messages.csv")
messages = messages.drop('CONVERSATION ID')

messages = messages.withColumnRenamed("CONVERSATION TITLE", "conversation_title")\
    .withColumnRenamed("FROM", "sender")\
    .withColumnRenamed("SENDER PROFILE URL", "sender_profile_url")\
    .withColumnRenamed("RECIPIENT PROFILE URLS", "recipient_profile_url")\
    .withColumnRenamed("SUBJECT", "subject")\
    .withColumnRenamed("CONTENT", "content")\
    .withColumnRenamed("FOLDER", "folder")\
    .withColumnRenamed("TO", "recipient")\
    .withColumnRenamed("DATE", "date")


messages.write.format("delta").mode("overwrite").save("s3a://processed/linkedin/messages")
  

                                                                                

#### Connections

In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('s3a://raw/linkedin/Connections.csv')
 
df = df.withColumnRenamed("First Name", "first_name")\
    .withColumnRenamed("Last Name", "last_name")\
    .withColumnRenamed("Email Address", "email")\
    .withColumnRenamed("Company", "company")\
    .withColumnRenamed("Position", "position")\
    .withColumnRenamed("Connected On", "connected_on")\
    .withColumnRenamed("URL", "profile_url")
  
df = df.withColumn(
    "full_name",
    F.concat("first_name", F.lit(" "), "last_name")
  )

df.write.format("delta").mode("overwrite").save("s3a://processed/linkedin/connections")
