In [6]:


! pip install --user redis





In [7]:
import redis
import json
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, DoubleType
# from pyspark.sql.redis import RedisDataFrameWriter

In [115]:
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("API Joining").getOrCreate()


In [116]:
account_url = "https://xloop-dummy.herokuapp.com/account"
councillor_url = "https://xloop-dummy.herokuapp.com/councillor"
patient_url = "https://xloop-dummy.herokuapp.com/patient"


In [117]:
account_data = requests.get(account_url).json()
councillor_data = requests.get(councillor_url).json()
patient_data = requests.get(patient_url).json()

In [118]:
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("created", StringType(), nullable=False),
    StructField("updated", StringType(), nullable=False),
    StructField("email", StringType(), nullable=False),
    StructField("password", StringType(), nullable=False),
    StructField("first_name", StringType(), nullable=False),
    StructField("last_name", StringType(), nullable=False),
    StructField("gender", StringType(), nullable=False),
    StructField("phone_number", StringType(), nullable=False),
    StructField("address", StructType([
        StructField("address", StringType(), nullable=False),
        StructField("location", StructType([
            StructField("lat", DoubleType(), nullable=False),
            StructField("lng", DoubleType(), nullable=False)
        ])),
        StructField("placeId", StringType(), nullable=False),
        StructField("region", StringType(), nullable=False)
    ])),
    StructField("national_identity", StringType(), nullable=False),
    StructField("role", StringType(), nullable=False),
    StructField("is_active", StringType(), nullable=False)
])


In [119]:
account_df = spark.createDataFrame(account_data,schema)
councillor_df = spark.createDataFrame(councillor_data)
patient_df = spark.createDataFrame(patient_data)


In [120]:
account_df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- created: string (nullable = false)
 |-- updated: string (nullable = false)
 |-- email: string (nullable = false)
 |-- password: string (nullable = false)
 |-- first_name: string (nullable = false)
 |-- last_name: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- phone_number: string (nullable = false)
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = false)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- lng: double (nullable = false)
 |    |-- placeId: string (nullable = false)
 |    |-- region: string (nullable = false)
 |-- national_identity: string (nullable = false)
 |-- role: string (nullable = false)
 |-- is_active: string (nullable = false)



In [121]:
location_schema = struct(
    col("address.location.lat").alias("lat"),
    col("address.location.lng").alias("lng")
)

In [122]:
selected_df = account_df.select("id", "address.location", "address.region")
selected_df.show(truncate=False)

+---+---------------------------------------+------+
|id |location                               |region|
+---+---------------------------------------+------+
|0  |{36.44304508252657, 60.876023921999035}|PK    |
|1  |{19.20048630158063, 72.58711960394703} |PK    |
|2  |{26.22412383124146, 60.930333428250265}|PK    |
|3  |{28.552212271780192, 65.70090668459534}|PK    |
|4  |{21.48841254263691, 65.29466790380093} |PK    |
|5  |{20.08490996562646, 68.279037242669}   |PK    |
|6  |{23.698474446703873, 66.58916837129675}|PK    |
|7  |{11.17532643717742, 65.37245761660222} |PK    |
|8  |{27.1916736388686, 69.28079244957337}  |PK    |
|9  |{16.509737146042216, 56.71376309118091}|PK    |
|10 |{21.53427314517241, 65.56749965860047} |PK    |
|11 |{17.98155427671901, 59.29928834431689} |PK    |
|12 |{25.564152857819003, 67.87519119529236}|PK    |
|13 |{25.08739302684949, 73.76956832921769} |PK    |
|14 |{17.108236096057514, 61.82117735129236}|PK    |
|15 |{25.224289086968014, 68.01153122632991}|P

In [123]:
selected_df = account_df.select(col("id"), col("address.region"),col("address.location.lat"), col("address.location.lng"))

# Show the selected data
selected_df.show(truncate=False)

+---+------+------------------+------------------+
|id |region|lat               |lng               |
+---+------+------------------+------------------+
|0  |PK    |36.44304508252657 |60.876023921999035|
|1  |PK    |19.20048630158063 |72.58711960394703 |
|2  |PK    |26.22412383124146 |60.930333428250265|
|3  |PK    |28.552212271780192|65.70090668459534 |
|4  |PK    |21.48841254263691 |65.29466790380093 |
|5  |PK    |20.08490996562646 |68.279037242669   |
|6  |PK    |23.698474446703873|66.58916837129675 |
|7  |PK    |11.17532643717742 |65.37245761660222 |
|8  |PK    |27.1916736388686  |69.28079244957337 |
|9  |PK    |16.509737146042216|56.71376309118091 |
|10 |PK    |21.53427314517241 |65.56749965860047 |
|11 |PK    |17.98155427671901 |59.29928834431689 |
|12 |PK    |25.564152857819003|67.87519119529236 |
|13 |PK    |25.08739302684949 |73.76956832921769 |
|14 |PK    |17.108236096057514|61.82117735129236 |
|15 |PK    |25.224289086968014|68.01153122632991 |
|16 |PK    |11.80043953379177 |

In [124]:
selected_df = account_df.select(col("id").alias("account_id"), col("address.region"), col("address.location.lat"), col("address.location.lng"))
patient_selected_df = patient_df.select("user_id", "id")
selected_df.show()

+----------+------+------------------+------------------+
|account_id|region|               lat|               lng|
+----------+------+------------------+------------------+
|         0|    PK| 36.44304508252657|60.876023921999035|
|         1|    PK| 19.20048630158063| 72.58711960394703|
|         2|    PK| 26.22412383124146|60.930333428250265|
|         3|    PK|28.552212271780192| 65.70090668459534|
|         4|    PK| 21.48841254263691| 65.29466790380093|
|         5|    PK| 20.08490996562646|   68.279037242669|
|         6|    PK|23.698474446703873| 66.58916837129675|
|         7|    PK| 11.17532643717742| 65.37245761660222|
|         8|    PK|  27.1916736388686| 69.28079244957337|
|         9|    PK|16.509737146042216| 56.71376309118091|
|        10|    PK| 21.53427314517241| 65.56749965860047|
|        11|    PK| 17.98155427671901| 59.29928834431689|
|        12|    PK|25.564152857819003| 67.87519119529236|
|        13|    PK| 25.08739302684949| 73.76956832921769|
|        14|  

In [126]:
joined_df = selected_df.join(patient_selected_df, selected_df.account_id == patient_selected_df.user_id)
joined_df.show()

[Stage 70:===>                                                    (1 + 15) / 16]

+----------+------+------------------+------------------+-------+---+
|account_id|region|               lat|               lng|user_id| id|
+----------+------+------------------+------------------+-------+---+
|        26|    PK|24.327297126232867| 65.20061345408942|     26| 26|
|        29|    PK|12.232047819667587| 70.89322806825771|     29| 29|
|       474|    PK|27.435004506579254| 62.27722683310867|    474|474|
|        65|    PK| 20.29138518234734| 72.91718310716317|     65| 65|
|       191|    PK| 33.81867208665051| 69.68087277391533|    191|191|
|       418|    PK| 34.57123272096376|61.791972041704526|    418|418|
|       541|    PK| 25.18686684910276| 66.68729541025937|    541|541|
|       558|    PK| 25.10865230088019| 65.45057709574826|    558|558|
|       222|    PK| 24.65563866846733| 66.93158715211189|    222|222|
|       270|    PK| 32.21766935475883|  66.0473222761342|    270|270|
|       293|    PK| 19.08680981771938| 72.08526096311243|    293|293|
|       243|    PK| 

                                                                                

In [127]:
joined_df_1 = selected_df.join(councillor_selected_df, selected_df.account_id == councillor_selected_df.user_id)
joined_df_1.show()

                                                                                

+----------+------+------------------+------------------+-------+----+
|account_id|region|               lat|               lng|user_id|  id|
+----------+------+------------------+------------------+-------+----+
|      2927|    PK|  30.6517477182322| 64.17921141051397|   2927|9510|
|      1950|    PK|21.945529433218823| 68.05776368171382|   1950| 824|
|      3091|    PK|11.744821994563832|  72.6987232288003|   3091|8955|
|      8209|    PK| 38.01697384949846| 64.10336278619866|   8209| 884|
|      1360|    PK| 38.34843891302184| 66.18160828416444|   1360| 686|
|      7635|    PK|25.056919852803265| 68.05527435581268|   7635|7832|
|      3845|    PK|13.672994224905649| 68.87425449048067|   3845|2339|
|      6856|    PK| 16.55760014062568| 55.65488048391353|   6856|2376|
|      3327|    PK| 36.63012666529774| 67.71350445458938|   3327|6832|
|      4191|    PK|27.110686020931823| 65.42889840019302|   4191|3248|
|      7146|    PK|24.963770198021244| 72.45101603223668|   7146|7164|
|     

In [128]:
councillor_selected_df = councillor_df.select("user_id", "id")
patient_selected_df = patient_df.select("user_id", "id")

In [129]:
joined_df = councillor_selected_df.join(patient_selected_df, "user_id")

In [41]:
joined_df.show()

[Stage 20:>                                                       (0 + 16) / 16]

+-------+----+----+
|user_id|  id|  id|
+-------+----+----+
|   6834|6738|3158|
|   6834|9625|3158|
|   4126|1890|9018|
|   4126|5232|9018|
|   6892|9911|2657|
|   3061|3526|5399|
|   5148|3797|3829|
|   7664|1730|2975|
|   3741|2951|8776|
|   2570|9619|8987|
|   2517|2821|1564|
|   6489|3864|4803|
|   8872|4473|8002|
|   8872|8810|8002|
|   9686| 360|3289|
|   9686|7102|3289|
|   9686|3653|3289|
|   9686|5730|3289|
|   1447|9235|6768|
|   1447|1139|6768|
+-------+----+----+
only showing top 20 rows



                                                                                

In [130]:
import redis

# Create a Redis client
redis_host = 'localhost'  # Replace with your Redis host
redis_port = 6379  # Replace with your Redis port
redis_client = redis.Redis(host=redis_host, port=redis_port)

# Set a key-value pair in Redis
redis_client.set('data_councellor', joined_df_1)

DataError: Invalid input of type: 'DataFrame'. Convert to a bytes, string, int or float first.