In [12]:
import plyvel

In [13]:
url_database = 'url_db/'
already_have_database = "already_have_db/"

In [14]:
db = plyvel.DB(url_database, create_if_missing=True)

In [15]:
db.get(b"SDJLFKSDJ:FLS")


In [16]:
db.close()

In [17]:
def getAlreadyHave(url, database = already_have_database):
    if type(url) != bytes:
        url = url.encode()
    try:
        db = plyvel.DB(database, create_if_missing=True)
        shorted_url = db.get(url)
        db.close()
        return shorted_url
    except Exception as ee:
        import traceback
        import sys
        exc_type, exc_value, exc_tb = sys.exc_info()
        result = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        print(result)
        print(f"Database conflicting when reading {url}")
        time.sleep(0.1)
        db.close()
        return getAlreadyHave(url)
def writeAlreadyHave(origin_url, shorten_url, database = already_have_database):
    key = origin_url
    value = shorten_url
    if type(key) != bytes:
        key = key.encode()
    if type(value) != bytes:
        value = value.encode()
    try:
        db = plyvel.DB(database, create_if_missing=True)
        if not db.get(key) or overwrite:
            db.put(key, value)
            success = True
        else:
            success = False
        db.close()
        return {"success": success}
    except Exception as ee:
        import traceback
        import sys
        exc_type, exc_value, exc_tb = sys.exc_info()
        result = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        print(result)
        print(f"Database conflicting when writing {key}:{value}")
        time.sleep(0.1)
        db.close()
        return db_write(key, value)

In [18]:
import time
def db_write(key, value, overwrite = False, database = url_database):
    if type(key) != bytes:
        key = key.encode()
    if type(value) != bytes:
        value = value.encode()
    try:
        db = plyvel.DB(database, create_if_missing=True)
        if not db.get(key) or overwrite:
            db.put(key, value)
            success = True
        else:
            success = False
        db.close()
        return {"success": success}
    except Exception as ee:
        import traceback
        import sys
        exc_type, exc_value, exc_tb = sys.exc_info()
        result = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        print(result)
        print(f"Database conflicting when writing {key}:{value}")
        time.sleep(0.1)
        db.close()
        return db_write(key, value)

In [19]:
def db_read(key, database = url_database):
    if type(key) != bytes:
        key = key.encode()
    try:
        db = plyvel.DB(database, create_if_missing=True)
        value = db.get(key)
        db.close()
        try: 
            value = value.decode()
        except:
            pass
        return value
    except Exception as ee:
        import traceback
        import sys
        exc_type, exc_value, exc_tb = sys.exc_info()
        result = "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        print(result)
        
        print(f"Database conflicting when reading {key}")
        time.sleep(0.1)
        db.close()
        return db_read(key)

In [20]:
import random
import string

def get_random_string(length):
    letters = string.ascii_lowercase + string.ascii_uppercase
    result_str = ''.join(random.choice(letters) for i in range(length))
    return result_str


In [42]:
# uvicorn filename:app --port 8001 --workers 5 --proxy-headers

from fastapi import FastAPI, Depends, HTTPException, status, File, UploadFile, HTTPException, Query, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.responses import HTMLResponse
import uvicorn
import time

import re
import requests

app = FastAPI()

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

from pydantic import BaseModel
from typing import Optional

class shorten_url_data(BaseModel):
    url: str
    except_link: Optional[str] = None

@app.post("/api/post/shorten_url/")
async def shorten_url(data: shorten_url_data):
    
    origin_url = data.url
    
    already_have = getAlreadyHave(origin_url)
    if already_have:
        shorten_url = already_have.decode()
    else:
        for i in range(10):
            shorten_url = get_random_string(4+i)
            print(shorten_url)
            result = db_write(shorten_url, origin_url)
            if result["success"] == True:
                break
        #print(f"{origin_url} -> https://pyurl.cc/{shorten_url}")
        writeAlreadyHave(origin_url, shorten_url)
    return {"message": "success",
            "shorten_url": f"https://pyurl.cc/{shorten_url}",
            "origin_url": origin_url}


pattern = r"\b(TelegramBot|TwitterBot|PlurkBot|facebookexternalhit|ZXing|okhttp|jptt|Mo PTT|curl|Wget)\b"

from fastapi.templating import Jinja2Templates
templates = Jinja2Templates(directory="templates")

@app.get("/{url}", response_class=HTMLResponse)
async def get_url(request: Request, url):
    if url == "":
        return {"message": "Home page is still building..."}
    
    redirect_url = db_read(url)
    
    if redirect_url:
        if re.search(pattern, request.headers["user-agent"]): #如果是機器人來抓網頁資訊
            return requests.get(redirect_url).text

        return templates.TemplateResponse("redir.html", {"request": request, "url": redirect_url})



@app.get("/qrcode/")
async def returnQrcode(request: Request, url: str ="Hi"):#token: str = Depends(oauth2_scheme)):
    print("URL", url)
    url = url.replace(" ", "+")
    return templates.TemplateResponse("qrCode.html", {"request": request, "link": url})


In [None]:

import nest_asyncio
nest_asyncio.apply()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=11133)

INFO:     Started server process [27627]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:11133 (Press CTRL+C to quit)


URL https://dev.fio.one/my-apps/821/view/DUUASIOMGPJHMWXQSIBHMHEQDPDLDCCTPGSPMIR9CX9RSTZUZBAFWLBROX9DOGIBGLNLPTJV9INWZ9999 YAZBRNOMFFYHT9BDRGWUXXKYPHESFYOJWFFG9OLKHALUJVQYHKVWBORCOHQPU9EMWPLIPTINKAUD99999
INFO:     127.0.0.1:41094 - "GET /qrcode/?url=https://dev.fio.one/my-apps/821/view/DUUASIOMGPJHMWXQSIBHMHEQDPDLDCCTPGSPMIR9CX9RSTZUZBAFWLBROX9DOGIBGLNLPTJV9INWZ9999+YAZBRNOMFFYHT9BDRGWUXXKYPHESFYOJWFFG9OLKHALUJVQYHKVWBORCOHQPU9EMWPLIPTINKAUD99999 HTTP/1.0" 200 OK
INFO:     127.0.0.1:41096 - "GET /favicon.ico HTTP/1.0" 200 OK
