# Kết nối tới neo4j bằng pyspark

## Ta sử dụng spark neo4j connector : [Link documentation](https://neo4j.com/developer/spark/overview/)


## Hướng dẫn cách cài đặt và sử dụng:
### 1) Tải pyspark thông qua pip hoặc conda
### 2) Cài đặt Apache Spark. 
### 3) Tải connector file (jar file)
- Đối với spark 3+ thì ta tải file *neo4j-connector-apache-spark_2.12-4.0.1_for_spark_3.jar"*

<img src='./image/Screenshot 2021-08-30 135735.png'/>

### 4) configure spark để kết nối tới neo4j
- Ở dòng code dưới, phần config(param1, param2) thì param1 = "spark.jars", còn param2 là đường dẫn tới file jar vừa tải

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars", "neo4j-connector-apache-spark_2.12-4.0.1_for_spark_3.jar").getOrCreate()



## Connector Options & Configuration

 *.option("url", "bolt://localhost:7687")* **Kết nối tới neo4j database đang hoạt động** <br>
 
 *.option("authentication.type", "basic")* **Sử dụng basic authentication với username hoặc password. Ngoài ra còn có 1**
 
 **số phương thực authenticate khác như Kerberos hay custom authentication** <br>
  
 *.option("authentication.basic.username", "neo4j")* **Nhập username của database** <br>
 
 *.option("authentication.basic.password", "123456")\* **Nhập password của database** <br>
 



## Read from Neo4j
- ### Dùng lệnh spark.read.format() để đọc dữ liệu từ neo4j
- ### Ở ví dụ dưới là câu lệnh lấy nodes có labels là Person: <br>
 
     - *option("labels", "Persion")*: **Lấy những nodes có labels là Person**

In [9]:
df = spark.read.format("org.neo4j.spark.DataSource")\
 .option("url", "bolt://localhost:7687")\
 .option("authentication.type", "basic")\
 .option("authentication.basic.username", "neo4j")\
 .option("authentication.basic.password", "123456")\
 .option("labels", "Person")\
 .load()
df.limit(5).show()


+----+--------+--------------------+-------------+
|<id>|<labels>|               title|imdb_title_id|
+----+--------+--------------------+-------------+
|   0|[Person]|Tarp musu, mergaiciu|    tt9591082|
|   1|[Person]|           So Pretty|    tt9595506|
|   2|[Person]|           Da ren wu|    tt9596638|
|   3|[Person]|   Fei chi ren sheng|    tt9597190|
|   4|[Person]|    Spasti Leningrad|    tt9598172|
+----+--------+--------------------+-------------+



### Ngoài ra, ta có thể sử dụng custom query để thực hiện truy vấn 
**Bằng cách dùng** *Option("query", your_query)*
<img src='./image/Screenshot 2021-08-30 143654.png'/>

## Để thuận tiện cho việc test, ta tạo 2 dataframe đơn giản

In [24]:
df_P = spark.createDataFrame(
 [(1, "John", 2),(2, "Tuan", 1)],
 ["actor_id", "name", "movie_id_acted"]
)

df_M = spark.createDataFrame(
 [(1, "Avatar"),(2, "Get out")],
 ["movie_id", "title"]
)


## Fucntion check_valid_string() dùng để kiểm tra tính hợp lệ của 1 string. 
- Ví dụ như khi tạo 1 node ta phải đặt tên labels của node đấy. Nếu người dùng đặt tên labels không hợp lệ như cho giá trị là Null thì tất nhiên cypher query để khởi tạo node sẽ bị lỗi. Vậy nên ta cần phải có hàm kiểm tra đầu vào của các parameter
- Nếu ta tạo web và để người dùng nhập input tại web, ta có thể kiểm soát input của người dùng. Tuy nhiên nếu người dùng sử dụng api để gọi hàm, thì ta không thể kiểm soát được là người dùng sẽ nhập gì. Vậy nên có 1 hàm validator để đảm bảo input được hợp lệ.

In [13]:
def check_valid_string(text):
    if type(text) is str:
        if len(text) > 0 :
            return True, "Success"
        return False, "String should have its length > 0"
    return False, "Your input is not string"
check_valid_string(34)

(False, 'Your input is not string')

## Tạo query dưới sang string (Node create)
- Ở đây em đặt tên properties của node là tên của cột trong dataframe cho thuận tiện 

In [21]:
# Ta có 3 parameter là df, label và mode
# - df: là dataframe mà ta định import vào neo4j
# - label: Tên label mà ta định tạo
# - mode: CREATE hay MERGE

def create_node_query(df, label, mode):
    try:
        # Check label input
        check_str, res_message = check_valid_string(label)
        if check_str == False:
            raise Exception("Label input: " + res_message)
        
        # Check mode input
        check_str, res_message = check_valid_string(mode)
        if check_str == False:
            raise Exception("Mode input: " + res_message)  
            
        columns = df.columns
        query_arr = []
        for col in columns:
            query_arr.append(col + ":event."+col)

        query = ",".join(query_arr)
        query = "{" + query + "}"
        query = f'{mode} (m:{label} {query})'
        return True, query
    except Exception as e: 
        return False, str(e)

check, query_true = create_node_query(df_P, "Person", "Create")
print(f"Query đúng: {query_true}")

check, query_false1 = create_node_query(df_P, None, "Create")
print(f"Query sai 1: {query_false1}")

check, query_false1 = create_node_query(df_P, "Person", "")
print(f"Query sai 2: {query_false1}")

Query đúng: Create (m:Person {actor_id:event.actor_id,name:event.name,movie_id_acted:event.movie_id_acted})
Query sai 1: Label input: Your input is not string
Query sai 2: Mode input: String should have its length > 0


**Nhìn ở dòng query được khởi tạo trên, ta thấy có: *"actor_id: event.actor_id"*, vậy event ở đây là gì**

In [26]:
df_P = spark.createDataFrame(
 [(1, "John", 2),(2, "Tuan", 1)],
 ["actor_id", "name", "movie_id_acted"]
)
df_P.show()

+--------+----+--------------+
|actor_id|name|movie_id_acted|
+--------+----+--------------+
|       1|John|             2|
|       2|Tuan|             1|
+--------+----+--------------+



**Mỗi 1 dòng trong dataframe coi như là 1 thực thể, 1 node trong neo4j. Còn các columns trong dataframe là các property của thực thể đó. Khi thực hiện lệnh Write thì Connector sẽ đọc từng dòng trong dataframe và gán nó vào biến event. Như vậy** *event.actor_id"* **tức là lấy giá trị của columns actor_id trong dataframe**

<img src='./image/Screenshot 2021-08-30 152105.png'/>

## Credential chứa username, password và url của database

In [29]:
credential = {
    "url" : "bolt://localhost:7687",
    "username": "neo4j",
    "password": "123456"
}

## Tạo node

In [51]:
def create_node(df, label, mode, credential):
    try:
        #get query
        check_query, result = create_node_query(df, label, mode)
        #if query is not valid, throw exception
        if check_query == False:
            raise Exception(result)
            
        df.write.format("org.neo4j.spark.DataSource")\
         .option("url", credential['url'])\
         .option("authentication.type", "basic")\
         .option("authentication.basic.username", credential['username'])\
         .option("authentication.basic.password", credential['password'])\
         .option("query", result)\
         .mode("Overwrite")\
         .save()
        print("Success")
    except Exception as e: 
        print(e)

create_node(df_P, "Person", "Merge", credential) # Trường hợp thành công
create_node(df_M, "Movie", "Merge", credential) # Trường hợp thành công
create_node(df_P, "Person", "", credential) # Trường hợp lỗi

Success
Success
Mode input: String should have its length > 0


### Kết quả sau khi khởi tạo

<img src='./image/Screenshot 2021-08-30 153145.png'/>

## Tạo Relationship
- Ví dụ ta cần tạo relationship giữa Person và Movie, thì ta cần 1 dataframe mới chứa 2 columns là 2 key để xác định Node được kết nối, và các columns còn lại là các properties của relationship đó.

In [55]:
join_df = df_P.join(df_M, df_P.movie_id_acted ==  df_M.movie_id,"inner")
join_df.show()

+--------+----+--------------+--------+-------+
|actor_id|name|movie_id_acted|movie_id|  title|
+--------+----+--------------+--------+-------+
|       2|Tuan|             1|       1| Avatar|
|       1|John|             2|       2|Get out|
+--------+----+--------------+--------+-------+



In [45]:
# Hàm tạo relationship cần các tham số
# - label1: Label name của node 1
# - label2: Label name của node 2
# - key1: giá trị de xác định node 1
# - key2: giá trị de xác định node 2
# - rel_type: Tên của 
# - props_list: tên các columns dungf dđể làm properties của relationship
# - mode: Create hay Merge

#Ví dụ
dict_var = {
    "label1": "Person",
    "label2": "Movie",
    "key1": "movie_id_acted",
    "key2": "movie_id",
    "rel_type": "ACT_IN",
    "props_list": ["name"],
    "mode": "Merge",
}

false_dict1 = {
    "label1": "Person",
    "label2": "Movie",
    "key1": "movie_id_acted",
    "key2": "movie_id",
    "rel_type": "ACT_IN",
    "props_list": "23",
    "mode": "Merge"
}

false_dict2 = {
    "label1": "Person",
    "label2": "Movie",
    "key1": "movie_id_acted",
    "key2": 23,
    "rel_type": "ACT_IN",
    "props_list": ['name'],
    "mode": "Merge",
}

In [52]:
def create_relationship_query(dict_var):
    label1 = dict_var["label1"]
    label2 = dict_var["label2"]
    key1 = dict_var["key1"]
    key2 = dict_var["key2"]
    rel_type = dict_var["rel_type"]
    props_list = dict_var["props_list"]
    mode = dict_var["mode"]
    
    try:
        dict_check = {key: value for key, value in dict_var.items() if key != "props_list"}
        for key, value in dict_check.items():
            if value is None:
                raise Exception("Error in "+ key + ": Your input is Null")
            check_str, res_message = check_valid_string(value)
            if check_str == False:
                raise Exception("Error in "+ key +": " + res_message)
                
        if isinstance(props_list, list) == False:
            raise Exception("props_list is not a list")
            
        if len(props_list) > 0:
            prop_query_set = []
            for prop in props_list:
                prop_query_set.append(prop + ":event."+ prop)

            props = ",".join(prop_query_set)
            props = "{" + props + "}"
        else:
            props = ""
            
        query = f"""
            MATCH
              (a:{label1}),
              (b:{label2})
            WHERE a.{key1} = event.{key1} AND b.{key2} = event.{key2}
            {mode} (a)-[r:{rel_type} {props}]->(b)
        """
        return True, query
    
    except Exception as e: 
        return False, str(e)
    
check_query, query = create_relationship_query(dict_var)
print(f'Query dđúng: {query}')

check_query, query = create_relationship_query(false_dict1)
print(f'Query sai 1: {query}')

check_query, query = create_relationship_query(false_dict2)
print(f'Query sai 2: {query}')

Query dđúng: 
            MATCH
              (a:Person),
              (b:Movie)
            WHERE a.movie_id_acted = event.movie_id_acted AND b.movie_id = event.movie_id
            Merge (a)-[r:ACT_IN {name:event.name}]->(b)
        
Query sai 1: props_list is not a list
Query sai 2: Error in key2: Your input is not string


In [56]:
def create_relationship(df, dict_var, credential):
    try:
        check_query, result = create_relationship_query(dict_var)
        if check_query == False:
            raise Exception(result)
            
        df.write.format("org.neo4j.spark.DataSource")\
         .option("url", credential['url'])\
         .option("authentication.type", "basic")\
         .option("authentication.basic.username", credential['username'])\
         .option("authentication.basic.password", credential['password'])\
         .option("query", result)\
         .mode("Overwrite")\
         .save()
        print("Success")
    except Exception as e: 
        print(e)
        
create_relationship(join_df, dict_var, credential)

Success


<img src='./image/Screenshot 2021-08-30 155714.png'/>

## Bây h ta sẽ dùng 1 bộ dataset lớn hơn để thử tốc độ import
- Sử dụng IMDb movies extensive dataset với hơn 81k movies và 175k+ cast members
- Link tải dataset: [Link](https://www.kaggle.com/stefanoleone992/imdb-extensive-dataset)

In [58]:
df_movie = spark.read.csv("../../Datasets/IMDb movies.csv", header=True)
df_example = df_movie.select(df_movie.columns[:2])
print((df_example.count(), len(df_example.columns)))
df_example.show()

(85855, 2)
+-------------+--------------------+
|imdb_title_id|               title|
+-------------+--------------------+
|    tt0000009|          Miss Jerry|
|    tt0000574|The Story of the ...|
|    tt0001892|      Den sorte drøm|
|    tt0002101|           Cleopatra|
|    tt0002130|           L'Inferno|
|    tt0002199|From the Manger t...|
|    tt0002423|      Madame DuBarry|
|    tt0002445|          Quo Vadis?|
|    tt0002452|Independenta Roma...|
|    tt0002461|         Richard III|
|    tt0002646|            Atlantis|
|    tt0002844|Fantômas - À l'om...|
|    tt0003014|Il calvario di un...|
|    tt0003037|Juve contre Fantômas|
|    tt0003102|Ma l'amor mio non...|
|    tt0003131|Maudite soit la g...|
|    tt0003165|     Le mort qui tue|
|    tt0003167|      Amore di madre|
|    tt0003419|Lo studente di Praga|
|    tt0003471|    Traffic in Souls|
+-------------+--------------------+
only showing top 20 rows



In [94]:
import timeit

start = timeit.default_timer()

create_node(df_example, "Person", "Merge", credential)

stop = timeit.default_timer()

print('Time: ', stop - start)  

# create_node(df_M, "Movie", "Merge", credential)

Success
Time:  198.87998609999977


## Ta thấy thời gian để import 85k row khoảng tầm 3 phút hơn

# Improving Performance:
## 1) Tune your Batch Size:
<img src='./image/Screenshot 2021-08-30 160531.png'/>

## 2) Tune your Neo4j Memory Configuration
<img src='./image/Screenshot 2021-08-30 160621.png'/>

## 3) Have the Right Indexes
<img src='./image/Screenshot 2021-08-30 160719.png'/>

## 4) Tune Your Parallelism
<img src='./image/Screenshot 2021-08-30 160849.png'/>
<img src='./image/Screenshot 2021-08-30 160859.png'/>
<br>

## Link gốc: [Link](https://neo4j.com/developer/spark/architecture/)