<a href="https://colab.research.google.com/github/AlexandreFleutelot/EWT_ESN/blob/main/pyspark_test01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
spark

In [None]:
from pyspark import SparkFiles
url = 'https://support.staffbase.com/hc/en-us/article_attachments/360009197031/username.csv'

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("username.csv"), header=True, sep=';',inferSchema=True)

In [None]:
df.show(10)

+---------+-----------+----------+---------+
| Username| Identifier|First name|Last name|
+---------+-----------+----------+---------+
| booker12|       9012|    Rachel|   Booker|
|   grey07|       2070|     Laura|     Grey|
|johnson81|       4081|     Craig|  Johnson|
|jenkins46|       9346|      Mary|  Jenkins|
|  smith79|       5079|     Jamie|    Smith|
+---------+-----------+----------+---------+



In [None]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
df.printSchema()

root
 |-- Username: string (nullable = true)
 |--  Identifier: integer (nullable = true)
 |-- First name: string (nullable = true)
 |-- Last name: string (nullable = true)



In [None]:
df = df.withColumnRenamed(" Identifier","Identifier")

In [None]:
df.select(["Username","Identifier"]).show(5)

+---------+----------+
| Username|Identifier|
+---------+----------+
| booker12|      9012|
|   grey07|      2070|
|johnson81|      4081|
|jenkins46|      9346|
|  smith79|      5079|
+---------+----------+



In [None]:
df.filter(df.Identifier>9000).show()

+---------+----------+----------+---------+
| Username|Identifier|First name|Last name|
+---------+----------+----------+---------+
| booker12|      9012|    Rachel|   Booker|
|jenkins46|      9346|      Mary|  Jenkins|
+---------+----------+----------+---------+



In [None]:


test = [("bob", ["red", "blue"]),
    ("maria", ["green", "red"]),
    ("sue", ["black"])]
df2 = spark.createDataFrame(test, schema=["name","colors"])
df2.show()

+-----+------------+
| name|      colors|
+-----+------------+
|  bob| [red, blue]|
|maria|[green, red]|
|  sue|     [black]|
+-----+------------+



In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import random

c = ["red", "blue", "green", "black"]

def rndColor():
  return random.choice(c)

udf_func = F.udf(rndColor,StringType())
df3 = df.withColumn("color",udf_func())
df3.show()

+---------+----------+----------+---------+-----+
| Username|Identifier|First name|Last name|color|
+---------+----------+----------+---------+-----+
| booker12|      9012|    Rachel|   Booker|black|
|   grey07|      2070|     Laura|     Grey|black|
|johnson81|      4081|     Craig|  Johnson|green|
|jenkins46|      9346|      Mary|  Jenkins| blue|
|  smith79|      5079|     Jamie|    Smith|  red|
+---------+----------+----------+---------+-----+



In [None]:
!ls /content/

dr.csv	drive  msgs.csv  sample_data


In [None]:
df_dr = spark.read.csv("/content/dr.csv", header=True, sep=';',inferSchema=True)
df_msgs = spark.read.csv("/content/msgs.csv", header=True, sep='\t',inferSchema=True)

df_dr.show()
df_msgs.show()

+-------+-----------+
|CTX_NUM|   TRN_LIST|
+-------+-----------+
|      1|    [1,3,4]|
|      2|[2,5,6,7,9]|
|      3|    [10,11]|
|      4|        [8]|
+-------+-----------+

+---+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|TRN|   Field-01|   Field-02|   Field-03|   Field-04|   Field-05|   Field-06|   Field-07|
+---+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|  1| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-1| Field-06-1| Field-07-1|
|  2| Field-01-2| Field-02-2| Field-03-2| Field-04-2| Field-05-2| Field-06-2| Field-07-2|
|  3| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-3| Field-06-3| Field-07-3|
|  4| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-4| Field-06-4| Field-07-4|
|  5| Field-01-2| Field-02-2| Field-03-2| Field-04-5| Field-05-5| Field-06-5| Field-07-5|
|  6| Field-01-6| Field-02-6| Field-03-6| Field-04-6| Field-05-6| Field-06-6| Field-07-6|
|  7| Field-0

In [None]:
joinedRDD = df_dr.join(df_msgs, how="inner").filter(df_dr.TRN_LIST.contains(df_msgs.TRN))
joinedRDD.show()

+-------+-----------+---+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|CTX_NUM|   TRN_LIST|TRN|   Field-01|   Field-02|   Field-03|   Field-04|   Field-05|   Field-06|   Field-07|
+-------+-----------+---+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|      1|    [1,3,4]|  1| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-1| Field-06-1| Field-07-1|
|      3|    [10,11]|  1| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-1| Field-06-1| Field-07-1|
|      2|[2,5,6,7,9]|  2| Field-01-2| Field-02-2| Field-03-2| Field-04-2| Field-05-2| Field-06-2| Field-07-2|
|      1|    [1,3,4]|  3| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-3| Field-06-3| Field-07-3|
|      1|    [1,3,4]|  4| Field-01-1| Field-02-1| Field-03-1| Field-04-1| Field-05-4| Field-06-4| Field-07-4|
|      2|[2,5,6,7,9]|  5| Field-01-2| Field-02-2| Field-03-2| Field-04-5| Field-05-5| Field-06-5| Field-07-5|
|      2|[

In [None]:
result = joinedRDD.groupby("CTX_NUM").agg(F.countDistinct("Field-01","Field-02"))
result.show()

+-------+-------------------------+
|CTX_NUM|count(Field-01, Field-02)|
+-------+-------------------------+
|      1|                        1|
|      3|                        2|
|      4|                        1|
|      2|                        3|
+-------+-------------------------+

