In [None]:
import json

In [None]:
from pathlib import Path

In [None]:
from dataclasses import dataclass

In [None]:
import sqlite3

In [None]:
import uuid

In [None]:
def VideoParametersToDictonary( videoId : str, title : str, author : str, authorId : str, 
                isLive : bool = False, published : str = "", viewCount : int = 0, 
                lengthSeconds : int = 0, timeAdded : int = 0, paid : bool = False, 
                contentType : str = "video" ):
    return {
        "videoId" : videoId, 
        "title" : title, 
        "author" : author, 
        "authorId" : authorId, 
        "isLive" : isLive, 
        "published" : published, 
        "viewCount" : viewCount, 
        "lengthSeconds" : lengthSeconds, 
        "timeAdded" : timeAdded, 
        "paid" : paid, 
        "type" : contentType 
    }

In [None]:
def PlaylistToDict( playlistName : str, _id : str, videos : [] ): 
    return { 
        "playlistName" : playlistName, 
        "videos" : videos, 
        "_id" : _id
    }

In [None]:
@dataclass
class NewPipeVideo: 
    videoUrl : str
    videoTitle : str
    author : str
    authorId: str

@dataclass
class NewPipePlaylist: 
    playlistId : str
    playlistName : str
    videos : list[ Video ]

In [None]:
def ReadNewPipeData( databasePath ):
    with sqlite3.connect( databasePath ) as connection: 
        cursor = connection.cursor()
        authorIds = {}
        for row in cursor.execute( "SELECT * FROM subscriptions" ).fetchall(): 
            authorIds[ row[ 3 ] ] = row[ 2 ]
        streams = {}
        for row in cursor.execute( "SELECT * FROM streams" ).fetchall(): 
            streams[ row[ 0 ] ] = row
        playlistCursor = cursor.execute( "SELECT * FROM playlists" ).fetchall()
        playlistData = {}
        for row in playlistCursor: 
            playlistData[ row[ 0 ] ] = NewPipePlaylist( row[ 0 ], row[ 1 ], [] )
        for row in cursor.execute( "SELECT * FROM playlist_stream_join" ).fetchall(): 
            stream = streams[ row[ 1 ] ]
            author = stream[ 6 ]
            authorId = authorIds[ author ] if author in authorIds else ""
            playlistData[ row[ 0 ] ].videos.append( NewPipeVideo( stream[ 2 ], stream[ 3 ], author, authorId ) )
        cursor.close()
        return playlistData

In [None]:
newPipePath = Path.cwd().parent / "TestData" / "NewPipeData-20210824_180145" / "newpipe.db"

In [None]:
ReadNewPipeData( str( newPipePath ) )

In [None]:
uuid.uuid4().hex

In [None]:
@dataclass
class FreeTubeVideo: 
    videoId : str
    videoTitle : str
    author : str
    authorId: str

@dataclass
class FreeTubePlaylist: 
    playlistId : str
    playlistName : str
    videos : list[ Video ]

In [None]:
YOUTUBE_VIDEO_ID_PREFIX_CONSTANT = "https://www.youtube.com/watch?v="

In [None]:
YOUTUBE_AUTHOR_ID_PREFIX_CONSTANT = "https://www.youtube.com/channel/"

In [None]:
def NewPipeVideoToFreeTubeVideo( video : NewPipeVideo ): 
    return FreeTubeVideo( 
            video.videoUrl.replace( YOUTUBE_VIDEO_ID_PREFIX_CONSTANT, "" ), 
            video.videoTitle, 
            video.author, 
            video.authorId.replace( YOUTUBE_AUTHOR_ID_PREFIX_CONSTANT, "" )
    )

In [None]:
NewPipeVideoToFreeTubeVideo( ReadNewPipeData( str( newPipePath ) )[ 1 ].videos[ 0 ] )

In [None]:
def NewPipePlaylistToFreeTubePlaylist( playlist : NewPipePlaylist, freeTubePlaylistId = uuid.uuid4().hex ):
    return FreeTubePlaylist( 
        freeTubePlaylistId, 
        playlist.playlistName, 
        [ NewPipeVideoToFreeTubeVideo( video ) 
                 for video in playlist.videos ]
    )


In [None]:
freeTubePlaylists = []
newPipeData = ReadNewPipeData( str( newPipePath ) )
for playlistId in newPipdData: 
    freeTubePlaylists.append( 
            NewPipePlaylistToFreeTubePlaylist( 
                    newPipeData[ playlistId ] ) )

In [None]:
freeTubePlaylists

In [None]:
def FreeTubePlaylistToJSON( playlist ):
    return PlaylistToDict( 
            playlist.playlistName, 
            playlist.playlistId, 
            [
                    VideoParametersToDictonary( 
                            video.videoId, 
                            video.videoTitle, 
                            video.author, 
                            video.authorId 
                    ) for video in playlist.videos
            ]
    )

In [None]:
def FreeTubePlaylistsToJSON( freeTubePlaylists : [] ):
    database = ""
    for playlist in freeTubePlaylists:
        database += json.dumps( FreeTubePlaylistToJSON( playlist ) ) + "\n"
    return database

In [None]:
FreeTubePlaylistsToJSON( freeTubePlaylists )

In [None]:
def NewPipePlaylistsToFreeTubePlaylists( newPipePlaylistsPath, freeTubePlaylistDatabasePath ):
    freeTubePlaylists = []
    newPipeData = ReadNewPipeData( str( newPipePlaylistsPath ) )
    for playlistId in newPipdData: 
        freeTubePlaylists.append( 
                NewPipePlaylistToFreeTubePlaylist( 
                        newPipeData[ playlistId ] ) )
    freeTubeJSON = FreeTubePlaylistsToJSON( freeTubePlaylists )
    with open( str( freeTubePlaylistDatabasePath ), 'a' ) as playlistDatabase: 
        playlistDatabase.writelines( freeTubeJSON )
    return freeTubeJSON


In [None]:
TEST_DATA_PATH_CONSTANT = Path.cwd().parent / "TestData"

In [None]:
NewPipePlaylistsToFreeTubePlaylists( 
        TEST_DATA_PATH_CONSTANT / "NewPipeData-20210824_180145" / "newpipe.db", 
        TEST_DATA_PATH_CONSTANT / "FreeTubeDB" / "playlistsTest.db" 
)

In [None]:
@dataclass
class NewPipeSubscription:
    serviceId : int
    channelName : str
    url : str

def ObtainNewPipeSubscriptionData( databasePath ):
    authorIds = []
    with sqlite3.connect( str( databasePath ) ) as connection: 
        cursor = connection.cursor()
        for row in cursor.execute( "SELECT * FROM subscriptions" ).fetchall(): 
            authorIds.append( NewPipeSubscription( row[ 1 ], row[ 3 ], row[ 2 ] ) )
        cursor.close()
    return authorIds

In [None]:
def WriteNewPipeSubscriptionList( newPipeSubscriptionData : [], 
        appVersion : str = "0.19.8", 
        appVersionInteger : int = 953 ): 
    return {
        "app_version" : appVersion, 
        "app_version_int" : appVersionInteger, 
        "subscriptions" : [ 
                    { 
                        "service_id" : subscription.serviceId, 
                        "url" : subscription.url,
                        "name" : subscription.channelName
                    }
                    for subscription in newPipeSubscriptionData
        ]
    }

In [None]:
def MakeNewPipeSubscriptionJsonromDatabase( databasePath ):
    return json.dumps( 
            WriteNewPipeSubscriptionList( 
                    ObtainNewPipeSubscriptionData( databasePath )
            )
    )

In [None]:
MakeNewPipeSubscriptionJSONFromDatabase( TEST_DATA_PATH_CONSTANT / "NewPipeData-20210824_180145" / "newpipe.db" )

In [None]:
def MakeNewPipeSubscriptionJsonFileFromDatabase( databasePath, newJsonFilePath ):
    jsonData = MakeNewPipeSubscriptionJsonromDatabase( databasePath )
    with open( str( newJsonFilePath ), 'w' ) as file:
        file.writelines( jsonData )
    return jsonData

In [None]:
MakeNewPipeSubscriptionJsonFileFromDatabase( 
        TEST_DATA_PATH_CONSTANT / "NewPipeData-20210824_180145" / "newpipe.db", 
        TEST_DATA_PATH_CONSTANT / "NewPipeSubscriptions" / "subscriptions.json" )

In [None]:
def NewPipeSubscriptionListToFreeTubeSubscritionList( newPipeSubscriptionData : [], 
        profileName : str = "NewPipe Subscriptions", 
        backgroundColor : str = "#FF6D00", 
        textColor : str = "#FFFFFF" ): 
    return {
        "name" : profileName, 
        "bgColor" : backgroundColor, 
        "textColor" : textColor, 
        "subscriptions" : [ 
                    { 
                        "id" : subscription.url.replace( YOUTUBE_AUTHOR_ID_PREFIX_CONSTANT, "" ),
                        "name" : subscription.channelName,
                        "thumbnail" : ""
                    }
                    for subscription in newPipeSubscriptionData
        ]
    }


In [None]:
def MakeNewPipeSubscriptionToFreePipeDatabase( newPipeDatabasePath, freePipeDatabase ):
    jsonData = json.dumps( NewPipeSubscriptionListToFreeTubeSubscritionList( 
                ObtainNewPipeSubscriptionData( newPipeDatabasePath ) ) )
    with open( str( freePipeDatabase ), 'a' ) as file:
        file.writelines( jsonData )
    return jsonData

In [None]:
MakeNewPipeSubscriptionToFreePipeDatabase( 
        TEST_DATA_PATH_CONSTANT / "NewPipeData-20210824_180145" / "newpipe.db", 
        TEST_DATA_PATH_CONSTANT / "FreeTubeDB" / "profilesTest.db" 
)